Enhance dynamic task watcher functionality

improvement/fix-task-deletion
Charitha Goonetilleke 2 years ago
parent 78f8e80718
commit 1db4b4095e

@ -77,7 +77,8 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
// during the server startup // during the server startup
if (localHashIndex == null ) { if (localHashIndex == null ) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Executing startup scheduled task (" + getTaskName() + ")"); log.debug("Executing startup scheduled task (" + getTaskName() + ") with class: " +
this.getClass().getName());
} }
executeDynamicTask(); executeDynamicTask();
return; return;

@ -27,7 +27,7 @@ public class TaskManagerDataHolder {
private HeartBeatManagementService heartBeatService; private HeartBeatManagementService heartBeatService;
private static TaskManagerDataHolder thisInstance = new TaskManagerDataHolder(); private static final TaskManagerDataHolder thisInstance = new TaskManagerDataHolder();
private TaskManagerDataHolder() { private TaskManagerDataHolder() {
} }

@ -293,7 +293,7 @@ public class TaskManagementServiceImpl implements TaskManagementService {
@Override @Override
public List<DynamicTask> getAllDynamicTasks() throws TaskManagementException { public List<DynamicTask> getAllDynamicTasks() throws TaskManagementException {
List<DynamicTask> dynamicTasks = null; List<DynamicTask> dynamicTasks;
try { try {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Fetching the details of all dynamic tasks"); log.debug("Fetching the details of all dynamic tasks");
@ -324,7 +324,7 @@ public class TaskManagementServiceImpl implements TaskManagementService {
@Override @Override
public DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementException { public DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementException {
DynamicTask dynamicTask = null; DynamicTask dynamicTask;
try { try {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Fetching the details of dynamic task '" + dynamicTaskId + "'"); log.debug("Fetching the details of dynamic task '" + dynamicTaskId + "'");
@ -352,7 +352,7 @@ public class TaskManagementServiceImpl implements TaskManagementService {
@Override @Override
public List<DynamicTask> getActiveDynamicTasks() throws TaskManagementException { public List<DynamicTask> getActiveDynamicTasks() throws TaskManagementException {
List<DynamicTask> dynamicTasks = null; List<DynamicTask> dynamicTasks;
try { try {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Fetching the details of all active dynamic tasks"); log.debug("Fetching the details of all active dynamic tasks");

@ -52,13 +52,18 @@
<Bundle-Description>Task Watcher Bundle</Bundle-Description> <Bundle-Description>Task Watcher Bundle</Bundle-Description>
<Private-Package>io.entgra.task.mgt.watcher.internal</Private-Package> <Private-Package>io.entgra.task.mgt.watcher.internal</Private-Package>
<Import-Package> <Import-Package>
org.osgi.framework.*;version="${imp.package.version.osgi.framework}", io.entgra.server.bootup.heartbeat.beacon.*,
org.osgi.service.*;version="${imp.package.version.osgi.service}",
org.apache.commons.logging,
org.wso2.carbon.ntask.*,
io.entgra.task.mgt.common.*, io.entgra.task.mgt.common.*,
io.entgra.task.mgt.core.*, io.entgra.task.mgt.core.*,
org.apache.commons.logging,
org.osgi.framework.*;version="${imp.package.version.osgi.framework}",
org.osgi.service.*;version="${imp.package.version.osgi.service}",
org.wso2.carbon.context,
org.wso2.carbon.core, org.wso2.carbon.core,
org.wso2.carbon.device.mgt.common.*,
org.wso2.carbon.ntask.*,
org.wso2.carbon.user.api,
org.wso2.carbon.user.core.*,
</Import-Package> </Import-Package>
<Export-Package> <Export-Package>
io.entgra.task.mgt.watcher.* io.entgra.task.mgt.watcher.*
@ -95,14 +100,40 @@
<artifactId>io.entgra.task.mgt.common</artifactId> <artifactId>io.entgra.task.mgt.common</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.device.mgt.common</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon.devicemgt</groupId> <groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.device.mgt.core</artifactId> <artifactId>org.wso2.carbon.device.mgt.core</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>io.entgra.server.bootup.heartbeat.beacon</artifactId>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.wso2.carbon</groupId> <groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.core</artifactId> <artifactId>org.wso2.carbon.core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.user.core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.user.api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.utils</artifactId>
<scope>provided</scope>
</dependency> </dependency>
</dependencies> </dependencies>

@ -18,7 +18,7 @@
package io.entgra.task.mgt.watcher; package io.entgra.task.mgt.watcher;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.task.mgt.common.bean.DynamicTask; import io.entgra.task.mgt.common.bean.DynamicTask;
import io.entgra.task.mgt.common.constant.TaskMgtConstants; import io.entgra.task.mgt.common.constant.TaskMgtConstants;
import io.entgra.task.mgt.common.exception.TaskManagementException; import io.entgra.task.mgt.common.exception.TaskManagementException;
@ -26,13 +26,21 @@ import io.entgra.task.mgt.core.util.TaskManagementUtil;
import io.entgra.task.mgt.watcher.internal.TaskWatcherDataHolder; import io.entgra.task.mgt.watcher.internal.TaskWatcherDataHolder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.core.ServerStartupObserver; import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.ntask.common.TaskException; import org.wso2.carbon.ntask.common.TaskException;
import org.wso2.carbon.ntask.core.TaskInfo; import org.wso2.carbon.ntask.core.TaskInfo;
import org.wso2.carbon.ntask.core.TaskManager; import org.wso2.carbon.ntask.core.TaskManager;
import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.user.api.Tenant;
import org.wso2.carbon.user.api.UserStoreException;
import org.wso2.carbon.user.core.service.RealmService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
@ -49,93 +57,169 @@ public class IoTSStartupHandler implements ServerStartupObserver {
timer.schedule(new TimerTask() { timer.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
compareTasks(); try {
compareTasks();
} catch (Exception e) {
log.error("Error occurred when comparing tasks.", e);
}
} }
}, 200000, 600000); }, 200000, 600000);
} }
private void compareTasks() { private void compareTasks() {
log.info("Comparing Tasks from carbon n task manager and engtra task manager"); if (log.isDebugEnabled()) {
TaskManager taskManager = null; log.debug("Comparing Tasks from carbon nTask manager and entgra task manager");
}
TaskManager taskManager;
TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService(); TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService();
if (nTaskService == null) { if (nTaskService == null) {
String msg = "Unable to load TaskService from the carbon n task core"; String msg = "Unable to load TaskService from the carbon nTask core";
log.error(msg); log.error(msg);
return;
} }
try { try {
if (!nTaskService.getRegisteredTaskTypes().contains(
TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
}
taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
List<DynamicTask> dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService() List<DynamicTask> dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService()
.getAllDynamicTasks(); .getAllDynamicTasks();
List<TaskInfo> tasks = taskManager.getAllTasks(); Map<Integer, List<DynamicTask>> tenantedDynamicTasks = new HashMap<>();
// add or update task into n task core List<DynamicTask> dts;
for (DynamicTask dt : dynamicTasks) { for (DynamicTask dt : dynamicTasks) {
String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); if (tenantedDynamicTasks.containsKey(dt.getTenantId())) {
boolean isExist = false; dts = tenantedDynamicTasks.get(dt.getTenantId());
for (TaskInfo taskInfo : tasks) { } else {
if (taskInfo.getName().equals(generatedTaskId)) { dts = new ArrayList<>();
isExist = true; }
TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo(); dts.add(dt);
String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(dt.getProperties()); tenantedDynamicTasks.put(dt.getTenantId(), dts);
String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties()); }
if (!triggerInfo.getCronExpression().equals(dt.getCronExpression()) for (Integer tenantId : tenantedDynamicTasks.keySet()) {
|| !dynamicTaskPropMD5.equals(existingTaskPropMD5)) { if (tenantId == -1) {
triggerInfo.setCronExpression(dt.getCronExpression()); log.warn("Found " + tenantedDynamicTasks.get(tenantId).size() +
taskInfo.setTriggerInfo(triggerInfo); " invalid tasks without a valid tenant id.");
taskInfo.setProperties(dt.getProperties()); continue;
taskManager.registerTask(taskInfo); }
taskManager.rescheduleTask(generatedTaskId); PrivilegedCarbonContext.startTenantFlow();
log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table"); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenantId, true);
if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
}
taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
List<TaskInfo> tasks = taskManager.getAllTasks();
// add or update task into nTask core
for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) {
String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId());
Map<String, String> taskProperties = dt.getProperties();
try {
int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx();
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId);
} catch (HeartBeatManagementException e) {
String msg = "Unexpected exception when getting server hash index.";
log.error(msg, e);
throw new TaskManagementException(msg, e);
}
boolean isExist = false;
for (TaskInfo taskInfo : tasks) {
if (taskInfo.getName().equals(generatedTaskId)) {
isExist = true;
TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo();
String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskProperties);
String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties());
if (!triggerInfo.getCronExpression().equals(dt.getCronExpression())
|| !dynamicTaskPropMD5.equals(existingTaskPropMD5)) {
triggerInfo.setCronExpression(dt.getCronExpression());
taskInfo.setTriggerInfo(triggerInfo);
taskInfo.setProperties(taskProperties);
taskManager.registerTask(taskInfo);
taskManager.rescheduleTask(generatedTaskId);
if (log.isDebugEnabled()) {
log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table");
}
}
if (dt.isEnabled()
&& taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) {
taskManager.resumeTask(generatedTaskId);
if (log.isDebugEnabled()) {
log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table");
}
} else if (!dt.isEnabled()
&& taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) {
taskManager.pauseTask(generatedTaskId);
if (log.isDebugEnabled()) {
log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table");
}
}
break;
} }
if (dt.isEnabled() }
&& taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) { if (!isExist) {
taskManager.resumeTask(generatedTaskId); TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table"); triggerInfo.setCronExpression(dt.getCronExpression());
} else if (!dt.isEnabled() TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(),
&& taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) { taskProperties, triggerInfo);
taskManager.pauseTask(generatedTaskId); taskManager.registerTask(taskInfo);
log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table"); taskManager.scheduleTask(generatedTaskId);
if (log.isDebugEnabled()) {
log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table");
} }
break;
} }
} }
if (!isExist) { PrivilegedCarbonContext.endTenantFlow();
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); }
triggerInfo.setCronExpression(dt.getCronExpression());
TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), List<Tenant> tenants = new ArrayList<>();
dt.getProperties(), triggerInfo); try {
taskManager.registerTask(taskInfo); RealmService realmService = TaskWatcherDataHolder.getInstance().getRealmService();
taskManager.scheduleTask(generatedTaskId); Tenant[] tenantArray = realmService.getTenantManager().getAllTenants();
log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table"); if (tenantArray != null && tenantArray.length != 0) {
tenants.addAll(Arrays.asList(tenantArray));
} }
Tenant superTenant = new Tenant();
superTenant.setId(-1234);
tenants.add(superTenant);
} catch (UserStoreException e) {
String msg = "Unable to load tenants";
log.error(msg, e);
return;
} }
// Remove deleted items from the n task core for (Tenant tenant : tenants) {
for (TaskInfo taskInfo : tasks) { PrivilegedCarbonContext.startTenantFlow();
boolean isExist = false; PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true);
for (DynamicTask dt : dynamicTasks) { if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
if (taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()))) { nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
isExist = true;
}
} }
if (!isExist) { taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
taskManager.deleteTask(taskInfo.getName()); List<TaskInfo> tasks = taskManager.getAllTasks();
log.debug("Task '" + taskInfo.getName() + "' deleted according to the dynamic task table"); // Remove deleted items from the nTask core
for (TaskInfo taskInfo : tasks) {
boolean isExist = false;
for (DynamicTask dt : dynamicTasks) {
if (tenant.getId() == dt.getTenantId() &&
taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()))) {
isExist = true;
break;
}
}
if (!isExist) {
taskManager.deleteTask(taskInfo.getName());
if (log.isDebugEnabled()) {
log.debug("Task '" + taskInfo.getName() + "' deleted according to the dynamic task table");
}
}
} }
PrivilegedCarbonContext.endTenantFlow();
} }
log.info("Task Comparison Completed and all tasks in current node are updated");
} catch ( if (log.isDebugEnabled()) {
TaskException e) { log.debug("Task Comparison Completed and all tasks in current node are updated");
String msg = "Error occurred while accessing carbon n task manager."; }
log.error(msg); } catch (TaskException e) {
} catch ( String msg = "Error occurred while accessing carbon nTask manager.";
TaskManagementException e) { log.error(msg, e);
} catch (TaskManagementException e) {
String msg = "Error occurred while retrieving all active tasks from entgra task manager"; String msg = "Error occurred while retrieving all active tasks from entgra task manager";
log.error(msg); log.error(msg, e);
} }
} }

@ -17,16 +17,20 @@
*/ */
package io.entgra.task.mgt.watcher.internal; package io.entgra.task.mgt.watcher.internal;
import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import io.entgra.task.mgt.common.spi.TaskManagementService; import io.entgra.task.mgt.common.spi.TaskManagementService;
import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.user.core.service.RealmService;
public class TaskWatcherDataHolder { public class TaskWatcherDataHolder {
private TaskManagementService taskManagerService; private TaskManagementService taskManagerService;
private TaskService nTaskService; private TaskService nTaskService;
private static TaskWatcherDataHolder thisInstance = new TaskWatcherDataHolder(); private HeartBeatManagementService heartBeatService;
private RealmService realmService;
private static final TaskWatcherDataHolder thisInstance = new TaskWatcherDataHolder();
private TaskWatcherDataHolder() {} private TaskWatcherDataHolder() {}
@ -50,4 +54,20 @@ public class TaskWatcherDataHolder {
this.nTaskService = nTaskService; this.nTaskService = nTaskService;
} }
public HeartBeatManagementService getHeartBeatService() {
return heartBeatService;
}
public void setHeartBeatService(HeartBeatManagementService heartBeatService) {
this.heartBeatService = heartBeatService;
}
public RealmService getRealmService() {
return this.realmService;
}
public void setRealmService(RealmService realmService) {
this.realmService = realmService;
}
} }

@ -17,7 +17,7 @@
*/ */
package io.entgra.task.mgt.watcher.internal; package io.entgra.task.mgt.watcher.internal;
import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import io.entgra.task.mgt.common.spi.TaskManagementService; import io.entgra.task.mgt.common.spi.TaskManagementService;
import io.entgra.task.mgt.core.config.TaskConfigurationManager; import io.entgra.task.mgt.core.config.TaskConfigurationManager;
import io.entgra.task.mgt.core.config.TaskManagementConfig; import io.entgra.task.mgt.core.config.TaskManagementConfig;
@ -28,7 +28,7 @@ import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext; import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.core.ServerStartupObserver; import org.wso2.carbon.core.ServerStartupObserver;
import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.user.core.service.RealmService;
/** /**
* @scr.component * @scr.component
@ -45,6 +45,18 @@ import org.wso2.carbon.ntask.core.service.TaskService;
* policy="dynamic" * policy="dynamic"
* bind="setTaskMgtService" * bind="setTaskMgtService"
* unbind="unsetTaskMgtService" * unbind="unsetTaskMgtService"
* @scr.reference name="entgra.heart.beat.service"
* interface="io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService"
* cardinality="0..1"
* policy="dynamic"
* bind="setHeartBeatService"
* unbind="unsetHeartBeatService"
* @scr.reference name="user.realmservice.default"
* interface="org.wso2.carbon.user.core.service.RealmService"
* cardinality="1..1"
* policy="dynamic"
* bind="setRealmService"
* unbind="unsetRealmService"
*/ */
public class TaskWatcherServiceComponent { public class TaskWatcherServiceComponent {
@ -110,4 +122,46 @@ public class TaskWatcherServiceComponent {
TaskWatcherDataHolder.getInstance().setTaskManagementService(null); TaskWatcherDataHolder.getInstance().setTaskManagementService(null);
} }
@SuppressWarnings("unused")
protected void setHeartBeatService(HeartBeatManagementService heartBeatService) {
if (log.isDebugEnabled()) {
log.debug("Setting heart beat service to Task Manager Service Component");
}
TaskWatcherDataHolder.getInstance().setHeartBeatService(heartBeatService);
}
@SuppressWarnings("unused")
protected void unsetHeartBeatService(HeartBeatManagementService heartBeatManagementService) {
if (log.isDebugEnabled()) {
log.debug("Removing heart beat service from Task Manager Service Component");
}
TaskWatcherDataHolder.getInstance().setHeartBeatService(null);
}
/**
* Sets Realm Service.
*
* @param realmService An instance of RealmService
*/
@SuppressWarnings("unused")
protected void setRealmService(RealmService realmService) {
if (log.isDebugEnabled()) {
log.debug("Setting Realm Service");
}
TaskWatcherDataHolder.getInstance().setRealmService(realmService);
}
/**
* Unsets Realm Service.
*
* @param realmService An instance of RealmService
*/
@SuppressWarnings("unused")
protected void unsetRealmService(RealmService realmService) {
if (log.isDebugEnabled()) {
log.debug("Unsetting Realm Service");
}
TaskWatcherDataHolder.getInstance().setRealmService(null);
}
} }
Loading…
Cancel
Save