Add Fixes and improvements for Dynamic & random schedule tasks

remotes/1720607818248768243/tmp_refs/heads/master
Lasantha Dharmakeerthi 9 months ago
commit 09772dfd3f

@ -59,4 +59,5 @@ public class ScheduledAppSubscriptionCleanupTask extends RandomlyAssignedSchedul
public String getTaskName() { public String getTaskName() {
return TASK_NAME; return TASK_NAME;
} }
} }

@ -145,4 +145,5 @@ public class ScheduledAppSubscriptionTask extends RandomlyAssignedScheduleTask {
public String getTaskName() { public String getTaskName() {
return TASK_NAME; return TASK_NAME;
} }
} }

@ -365,13 +365,6 @@
<dependency> <dependency>
<groupId>org.wso2.orbit.javax.xml.bind</groupId> <groupId>org.wso2.orbit.javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId> <artifactId>jaxb-api</artifactId>
<version>2.3.1.wso2v1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.wso2.orbit.javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.1.wso2v1</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency> <dependency>

@ -18,9 +18,6 @@
package io.entgra.device.mgt.core.device.mgt.core.operation.timeout.task.impl; package io.entgra.device.mgt.core.device.mgt.core.operation.timeout.task.impl;
import com.google.gson.Gson; import com.google.gson.Gson;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException; import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException;
import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.Activity; import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.Activity;
import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.ActivityStatus; import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.ActivityStatus;
@ -29,75 +26,85 @@ import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.OperationManage
import io.entgra.device.mgt.core.device.mgt.core.config.operation.timeout.OperationTimeout; import io.entgra.device.mgt.core.device.mgt.core.config.operation.timeout.OperationTimeout;
import io.entgra.device.mgt.core.device.mgt.core.dto.DeviceType; import io.entgra.device.mgt.core.device.mgt.core.dto.DeviceType;
import io.entgra.device.mgt.core.device.mgt.core.internal.DeviceManagementDataHolder; import io.entgra.device.mgt.core.device.mgt.core.internal.DeviceManagementDataHolder;
import io.entgra.device.mgt.core.device.mgt.core.task.impl.DynamicPartitionedScheduleTask; import io.entgra.device.mgt.core.device.mgt.core.task.impl.RandomlyAssignedScheduleTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
public class OperationTimeoutTask extends DynamicPartitionedScheduleTask { public class OperationTimeoutTask extends RandomlyAssignedScheduleTask {
private static final Log log = LogFactory.getLog(OperationTimeoutTask.class); private static final Log log = LogFactory.getLog(OperationTimeoutTask.class);
public static final String OPERATION_TIMEOUT_TASK = "OPERATION_TIMEOUT_TASK";
private Map<String, String> properties;
@Override
public final void setProperties(Map<String, String> properties) {
this.properties = properties;
}
public final String getProperty(String name) {
if (properties == null) {
return null;
}
return properties.get(name);
}
@Override @Override
protected void setup() { protected void setup() {
} }
@Override @Override
protected void executeDynamicTask() { public String getTaskName() {
if (isQualifiedToExecuteTask()) { // this task will run only in one node when the deployment has multiple nodes return OPERATION_TIMEOUT_TASK;
String operationTimeoutTaskConfigStr = getProperty( }
OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
Gson gson = new Gson(); @Override
OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class); protected void executeRandomlyAssignedTask() {
try { // this task will run only in one node when the deployment has multiple nodes
long timeMillis = System.currentTimeMillis() - (long) operationTimeoutConfig.getTimeout(); String operationTimeoutTaskConfigStr = getProperty(
List<String> deviceTypes = new ArrayList<>(); OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
if (operationTimeoutConfig.getDeviceTypes().size() == 1 && Gson gson = new Gson();
"ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) { OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class);
try { try {
List<DeviceType> deviceTypeList = DeviceManagementDataHolder.getInstance() long timeMillis = System.currentTimeMillis() - (long) operationTimeoutConfig.getTimeout();
.getDeviceManagementProvider().getDeviceTypes(); List<String> deviceTypes = new ArrayList<>();
for (DeviceType deviceType : deviceTypeList) { if (operationTimeoutConfig.getDeviceTypes().size() == 1 &&
deviceTypes.add(deviceType.getName()); "ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) {
} try {
} catch (DeviceManagementException e) { List<DeviceType> deviceTypeList = DeviceManagementDataHolder.getInstance()
log.error("Error occurred while reading device types", e); .getDeviceManagementProvider().getDeviceTypes();
for (DeviceType deviceType : deviceTypeList) {
deviceTypes.add(deviceType.getName());
} }
} else { } catch (DeviceManagementException e) {
deviceTypes = operationTimeoutConfig.getDeviceTypes(); log.error("Error occurred while reading device types", e);
} }
List<Activity> activities = DeviceManagementDataHolder.getInstance().getOperationManager() } else {
.getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis, deviceTypes = operationTimeoutConfig.getDeviceTypes();
operationTimeoutConfig.getInitialStatus()); }
for (Activity activity : activities) { List<Activity> activities = DeviceManagementDataHolder.getInstance().getOperationManager()
for (ActivityStatus activityStatus : activity.getActivityStatus()) { .getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis,
String operationId = activity.getActivityId().replace("ACTIVITY_", ""); operationTimeoutConfig.getInitialStatus());
Operation operation = DeviceManagementDataHolder.getInstance().getOperationManager() String operationId;
.getOperation(Integer.parseInt(operationId)); Operation operation;
operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus())); for (Activity activity : activities) {
DeviceManagementDataHolder.getInstance().getOperationManager() operationId = activity.getActivityId().replace("ACTIVITY_", "");
.updateOperation(activityStatus.getDeviceIdentifier(), operation); for (ActivityStatus activityStatus : activity.getActivityStatus()) {
} operation = DeviceManagementDataHolder.getInstance().getOperationManager()
.getOperation(Integer.parseInt(operationId));
operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus()));
DeviceManagementDataHolder.getInstance().getOperationManager()
.updateOperation(activityStatus.getDeviceIdentifier(), operation);
} }
} catch (OperationManagementException e) {
String msg = "Error occurred while retrieving operations.";
log.error(msg, e);
} }
} catch (OperationManagementException e) {
String msg = "Error occurred while retrieving operations.";
log.error(msg, e);
} }
} }
private boolean isQualifiedToExecuteTask() {
if (isDynamicTaskEligible()) {
try {
return DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask();
} catch (HeartBeatManagementException e) {
log.error("Error while checking is qualified to execute task", e);
}
} else {
return true;
}
return false;
}
} }

@ -23,16 +23,16 @@ import org.apache.commons.logging.LogFactory;
import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalException; import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalException;
import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalService; import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalService;
import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalServiceImpl; import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalServiceImpl;
import org.wso2.carbon.ntask.core.Task;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class ArchivalTask implements Task { public class ArchivalTask extends RandomlyAssignedScheduleTask {
private static final Log log = LogFactory.getLog(ArchivalTask.class); private static final Log log = LogFactory.getLog(ArchivalTask.class);
private static final String TASK_NAME = "DATA_ARCHIVAL_TASK";
private ArchivalService archivalService; private ArchivalService archivalService;
@ -42,12 +42,12 @@ public class ArchivalTask implements Task {
} }
@Override @Override
public void init() { protected void setup() {
this.archivalService = new ArchivalServiceImpl(); this.archivalService = new ArchivalServiceImpl();
} }
@Override @Override
public void execute() { protected void executeRandomlyAssignedTask() {
log.info("Executing ArchivalTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date())); log.info("Executing ArchivalTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
@ -60,6 +60,11 @@ public class ArchivalTask implements Task {
log.info("ArchivalTask completed. Total execution time: " + getDurationBreakdown(difference)); log.info("ArchivalTask completed. Total execution time: " + getDurationBreakdown(difference));
} }
@Override
public String getTaskName() {
return TASK_NAME;
}
private String getDurationBreakdown(long millis) { private String getDurationBreakdown(long millis) {
if (millis < 0) { if (millis < 0) {
throw new IllegalArgumentException("Duration must be greater than zero!"); throw new IllegalArgumentException("Duration must be greater than zero!");
@ -74,4 +79,5 @@ public class ArchivalTask implements Task {
return (days + " Days " + hours + " Hours " + minutes + " Minutes " + seconds + " Seconds"); return (days + " Days " + hours + " Hours " + minutes + " Minutes " + seconds + " Seconds");
} }
} }

@ -29,9 +29,10 @@ import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
public class ArchivedDataDeletionTask implements Task { public class ArchivedDataDeletionTask extends RandomlyAssignedScheduleTask {
private static Log log = LogFactory.getLog(ArchivedDataDeletionTask.class); private static final Log log = LogFactory.getLog(ArchivedDataDeletionTask.class);
private static final String TASK_NAME = "ARCHIVED_DATA_CLEANUP_TASK";
private ArchivalService archivalService; private ArchivalService archivalService;
@ -41,12 +42,12 @@ public class ArchivedDataDeletionTask implements Task {
} }
@Override @Override
public void init() { public void setup() {
this.archivalService = new ArchivalServiceImpl(); this.archivalService = new ArchivalServiceImpl();
} }
@Override @Override
public void execute() { protected void executeRandomlyAssignedTask() {
log.info("Executing DataDeletionTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date())); log.info("Executing DataDeletionTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
long startTime = System.nanoTime(); long startTime = System.nanoTime();
try { try {
@ -58,4 +59,10 @@ public class ArchivedDataDeletionTask implements Task {
long difference = (endTime - startTime) / (1000000 * 1000); long difference = (endTime - startTime) / (1000000 * 1000);
log.info("DataDeletionTask completed. Total execution time: " + difference + " seconds"); log.info("DataDeletionTask completed. Total execution time: " + difference + " seconds");
} }
@Override
public String getTaskName() {
return TASK_NAME;
}
} }

@ -109,4 +109,5 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask {
protected void setup() { protected void setup() {
} }
} }

@ -53,7 +53,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
public final void init() { public final void init() {
try { try {
boolean dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled(); boolean dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled();
if(dynamicTaskEnabled){ if (dynamicTaskEnabled) {
taskContext = new DynamicTaskContext(); taskContext = new DynamicTaskContext();
taskContext.setPartitioningEnabled(true); taskContext.setPartitioningEnabled(true);
} else { } else {
@ -75,9 +75,9 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
String localHashIndex = getProperty(TaskMgtConstants.Task.LOCAL_HASH_INDEX); String localHashIndex = getProperty(TaskMgtConstants.Task.LOCAL_HASH_INDEX);
// These tasks are not dynamically scheduled. They are added via a config so scheduled in each node // These tasks are not dynamically scheduled. They are added via a config so scheduled in each node
// during the server startup // during the server startup
if (localHashIndex == null ) { if (localHashIndex == null) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Executing startup scheduled task (" + getTaskName() + ") with class: " + log.debug("Executing startup scheduled task (" + getTaskName() + ") with class: " +
this.getClass().getName()); this.getClass().getName());
} }
executeDynamicTask(); executeDynamicTask();
@ -116,7 +116,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
private void updateContext() throws HeartBeatManagementException { private void updateContext() throws HeartBeatManagementException {
ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
if(ctxInfo != null) { if (ctxInfo != null) {
populateContext(ctxInfo); populateContext(ctxInfo);
} else { } else {
log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode."); log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode.");
@ -127,10 +127,10 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
taskContext.setActiveServerCount(ctxInfo.getActiveServerCount()); taskContext.setActiveServerCount(ctxInfo.getActiveServerCount());
taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx()); taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx());
if(log.isDebugEnabled()){ if (log.isDebugEnabled()) {
log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() + log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() +
" where active server count is : " + taskContext.getActiveServerCount() + " where active server count is : " + taskContext.getActiveServerCount() +
" partitioning task enabled : " + taskContext.isPartitioningEnabled()); " partitioning task enabled : " + taskContext.isPartitioningEnabled());
} }
} }
@ -142,7 +142,8 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
return taskContext; return taskContext;
} }
public static boolean isDynamicTaskEligible(){ @Deprecated
public static boolean isDynamicTaskEligible() {
return taskContext != null && taskContext.isPartitioningEnabled(); return taskContext != null && taskContext.isPartitioningEnabled();
} }

@ -28,8 +28,6 @@ import org.wso2.carbon.ntask.core.Task;
public abstract class RandomlyAssignedScheduleTask implements Task { public abstract class RandomlyAssignedScheduleTask implements Task {
private static final Log log = LogFactory.getLog(RandomlyAssignedScheduleTask.class); private static final Log log = LogFactory.getLog(RandomlyAssignedScheduleTask.class);
private static String taskName = "UNSPECIFIED";
private static boolean qualifiedToExecuteTask = false; private static boolean qualifiedToExecuteTask = false;
private static boolean dynamicTaskEnabled = false; private static boolean dynamicTaskEnabled = false;
@ -38,32 +36,32 @@ public abstract class RandomlyAssignedScheduleTask implements Task {
try { try {
dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled(); dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled();
} catch (HeartBeatManagementException e) { } catch (HeartBeatManagementException e) {
log.error("Error Instantiating Variables necessary for Randomly Assigned Task Scheduling." , e); log.error("Error Instantiating Variables necessary for Randomly Assigned Task Scheduling.", e);
} }
//This is done so that sub class extending this abstract class is forced to specify a task name.
taskName = getTaskName();
setup(); setup();
} }
@Override @Override
public final void execute() { public final void execute() {
refreshContext(); refreshContext();
executeRandomlyAssignedTask(); if (isQualifiedToExecuteTask()) {
executeRandomlyAssignedTask();
}
} }
public void refreshContext(){ public void refreshContext() {
if(dynamicTaskEnabled) { if (dynamicTaskEnabled) {
try { try {
qualifiedToExecuteTask = DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask(); qualifiedToExecuteTask = DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask();
log.info("## NODE Qualified to execute Randomly Assigned Task : " + taskName);
DeviceManagementDataHolder.getInstance().getHeartBeatService().updateTaskExecutionAcknowledgement(taskName);
} catch (HeartBeatManagementException e) { } catch (HeartBeatManagementException e) {
log.error("Error refreshing Variables necessary for Randomly Assigned Scheduled Task. " + log.error("Error refreshing variables necessary for " +
"Dynamic Tasks will not function.", e); "Randomly Assigned Scheduled Task: " + getTaskName(), e);
} }
} else { } else {
qualifiedToExecuteTask = true; qualifiedToExecuteTask = true;
} }
log.info("Node is " + (qualifiedToExecuteTask ? "" : "not")
+ " qualified to execute Randomly Assigned Task : " + getTaskName());
} }
protected abstract void setup(); protected abstract void setup();
@ -75,4 +73,5 @@ public abstract class RandomlyAssignedScheduleTask implements Task {
} }
public abstract String getTaskName(); public abstract String getTaskName();
} }

@ -147,7 +147,7 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
} }
} catch (SQLException e) { } catch (SQLException e) {
String msg = "Error occurred while updating task list of elected server : '" + String msg = "Error occurred while updating task list of elected server : '" +
uuid + "' and task list " + taskList; uuid + "' and task list " + taskList;
log.error(msg, e); log.error(msg, e);
throw new HeartBeatDAOException(msg, e); throw new HeartBeatDAOException(msg, e);
} }

@ -151,7 +151,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
if (candidate != null && candidate.getServerUUID().equalsIgnoreCase(localServerUUID)) { if (candidate != null && candidate.getServerUUID().equalsIgnoreCase(localServerUUID)) {
isQualified = true; isQualified = true;
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Node : " + localServerUUID + " Qualified to execute randomly assigned task."); log.debug("Node : " + localServerUUID + " is qualified to execute randomly assigned task.");
} }
} }
} catch (HeartBeatDAOException e) { } catch (HeartBeatDAOException e) {

@ -64,7 +64,7 @@ public class DelegationTask extends DynamicPartitionedScheduleTask {
try { try {
devices = new ArrayList<>(); devices = new ArrayList<>();
toBeNotified = new ArrayList<>(); toBeNotified = new ArrayList<>();
if (isDynamicTaskEligible()) { if(getTaskContext() != null && getTaskContext().isPartitioningEnabled()){
devices.addAll(service.getAllocatedDevices(deviceType, devices.addAll(service.getAllocatedDevices(deviceType,
getTaskContext().getActiveServerCount(), getTaskContext().getActiveServerCount(),
getTaskContext().getServerHashIndex())); getTaskContext().getServerHashIndex()));

@ -105,7 +105,7 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask {
PolicyManagementDataHolder.getInstance().getDeviceManagementService() PolicyManagementDataHolder.getInstance().getDeviceManagementService()
.getPolicyMonitoringManager(deviceType); .getPolicyMonitoringManager(deviceType);
List<Device> devices; List<Device> devices;
if(isDynamicTaskEligible()){ if (getTaskContext() != null && getTaskContext().isPartitioningEnabled()) {
devices = deviceManagementProviderService devices = deviceManagementProviderService
.getAllocatedDevices(deviceType, getTaskContext().getActiveServerCount(), .getAllocatedDevices(deviceType, getTaskContext().getActiveServerCount(),
getTaskContext().getServerHashIndex()); getTaskContext().getServerHashIndex());

@ -49,5 +49,6 @@ public class TaskMgtConstants {
public static final String TENANT_ID_PROP = "__TENANT_ID_PROP__"; public static final String TENANT_ID_PROP = "__TENANT_ID_PROP__";
public static final String LOCAL_HASH_INDEX = "__LOCAL_HASH_INDEX__"; public static final String LOCAL_HASH_INDEX = "__LOCAL_HASH_INDEX__";
public static final String LOCAL_TASK_NAME = "__LOCAL_TASK_NAME__"; public static final String LOCAL_TASK_NAME = "__LOCAL_TASK_NAME__";
public static final String DYNAMIC_TASK_ID = "__DYNAMIC_TASK_ID__";
} }
} }

@ -22,6 +22,7 @@ import io.entgra.device.mgt.core.task.mgt.common.exception.TaskNotFoundException
import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementException; import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementException;
import java.util.List; import java.util.List;
import java.util.Map;
public interface TaskManagementService { public interface TaskManagementService {
@ -37,7 +38,9 @@ public interface TaskManagementService {
List<DynamicTask> getAllDynamicTasks() throws TaskManagementException; List<DynamicTask> getAllDynamicTasks() throws TaskManagementException;
DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementException; Map<Integer, List<DynamicTask>> getDynamicTasksForAllTenants() throws TaskManagementException;
DynamicTask getDynamicTask(int dynamicTaskId) throws TaskManagementException;
List<DynamicTask> getActiveDynamicTasks() throws TaskManagementException; List<DynamicTask> getActiveDynamicTasks() throws TaskManagementException;
} }

@ -27,16 +27,18 @@ import java.util.List;
*/ */
public interface DynamicTaskDAO { public interface DynamicTaskDAO {
int addTask(DynamicTask dynamicTask) throws TaskManagementDAOException; int addTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException;
boolean updateDynamicTask(DynamicTask dynamicTask) throws TaskManagementDAOException; boolean updateDynamicTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException;
void deleteDynamicTask(int dynamicTaskId) throws TaskManagementDAOException; void deleteDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException;
DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementDAOException; DynamicTask getDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException;
List<DynamicTask> getAllDynamicTasks() throws TaskManagementDAOException; List<DynamicTask> getAllDynamicTasks() throws TaskManagementDAOException;
List<DynamicTask> getActiveDynamicTasks() throws TaskManagementDAOException; List<DynamicTask> getAllDynamicTasks(int tenantId) throws TaskManagementDAOException;
List<DynamicTask> getActiveDynamicTasks(int tenantId) throws TaskManagementDAOException;
} }

@ -26,10 +26,11 @@ import java.util.Map;
*/ */
public interface DynamicTaskPropDAO { public interface DynamicTaskPropDAO {
void addTaskProperties(int dynamicTaskId, Map<String, String> properties) throws TaskManagementDAOException; void addTaskProperties(int dynamicTaskId, Map<String, String> properties, int tenantId)
throws TaskManagementDAOException;
Map<String, String> getDynamicTaskProps(int dynamicTaskId) throws TaskManagementDAOException; Map<String, String> getDynamicTaskProps(int dynamicTaskId, int tenantId) throws TaskManagementDAOException;
void updateDynamicTaskProps(int dynamicTaskId, Map<String, String> properties) void updateDynamicTaskProps(int dynamicTaskId, Map<String, String> properties, int tenantId)
throws TaskManagementDAOException; throws TaskManagementDAOException;
} }

@ -103,18 +103,22 @@ public class TaskManagementDAOFactory {
conn.setAutoCommit(false); conn.setAutoCommit(false);
currentConnection.set(conn); currentConnection.set(conn);
} catch (SQLException e) { } catch (SQLException e) {
throw new TransactionManagementException("Error occurred while retrieving config.datasource connection", e); throw new TransactionManagementException("Error occurred while retrieving datasource connection", e);
} }
} }
public static void openConnection() throws SQLException { public static void openConnection() throws TransactionManagementException {
Connection conn = currentConnection.get(); Connection conn = currentConnection.get();
if (conn != null) { if (conn != null) {
throw new IllegalTransactionStateException("A transaction is already active within the context of " + throw new IllegalTransactionStateException("A transaction is already active within the context of " +
"this particular thread. Therefore, calling 'beginTransaction/openConnection' while another " + "this particular thread. Therefore, calling 'beginTransaction/openConnection' while another " +
"transaction is already active is a sign of improper transaction handling"); "transaction is already active is a sign of improper transaction handling");
} }
conn = dataSource.getConnection(); try {
conn = dataSource.getConnection();
} catch (SQLException e) {
throw new TransactionManagementException("Error occurred while retrieving datasource connection", e);
}
currentConnection.set(conn); currentConnection.set(conn);
} }

@ -17,16 +17,19 @@
*/ */
package io.entgra.device.mgt.core.task.mgt.core.dao.impl; package io.entgra.device.mgt.core.task.mgt.core.dao.impl;
import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory;
import io.entgra.device.mgt.core.task.mgt.core.dao.util.TaskManagementDAOUtil;
import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask; import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask;
import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementDAOException; import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementDAOException;
import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskDAO; import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskDAO;
import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory;
import io.entgra.device.mgt.core.task.mgt.core.dao.util.TaskManagementDAOUtil;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import java.sql.*; import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List; import java.util.List;
@ -34,9 +37,9 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
private static final Log log = LogFactory.getLog(DynamicTaskDAOImpl.class); private static final Log log = LogFactory.getLog(DynamicTaskDAOImpl.class);
@Override @Override
public int addTask(DynamicTask dynamicTask) throws TaskManagementDAOException { public int addTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException {
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet rs = null; ResultSet rs;
int taskId = -1; int taskId = -1;
try { try {
Connection conn = TaskManagementDAOFactory.getConnection(); Connection conn = TaskManagementDAOFactory.getConnection();
@ -48,13 +51,14 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
stmt.setString(2, dynamicTask.getName()); stmt.setString(2, dynamicTask.getName());
stmt.setBoolean(3, dynamicTask.isEnabled()); stmt.setBoolean(3, dynamicTask.isEnabled());
stmt.setString(4, dynamicTask.getTaskClassName()); stmt.setString(4, dynamicTask.getTaskClassName());
stmt.setInt(5, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()); stmt.setInt(5, tenantId);
stmt.executeUpdate(); stmt.executeUpdate();
rs = stmt.getGeneratedKeys(); rs = stmt.getGeneratedKeys();
if (rs.next()) { if (rs.next()) {
taskId = rs.getInt(1); taskId = rs.getInt(1);
} }
dynamicTask.setDynamicTaskId(taskId);
return taskId; return taskId;
} catch (SQLException e) { } catch (SQLException e) {
String msg = "Error occurred while inserting task '" + dynamicTask.getName() + "'"; String msg = "Error occurred while inserting task '" + dynamicTask.getName() + "'";
@ -66,16 +70,17 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
} }
@Override @Override
public boolean updateDynamicTask(DynamicTask dynamicTask) throws TaskManagementDAOException { public boolean updateDynamicTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException {
PreparedStatement stmt = null; PreparedStatement stmt = null;
int rows; int rows;
try { try {
Connection conn = TaskManagementDAOFactory.getConnection(); Connection conn = TaskManagementDAOFactory.getConnection();
String sql = "UPDATE DYNAMIC_TASK SET CRON = ?,IS_ENABLED = ? WHERE DYNAMIC_TASK_ID = ?"; String sql = "UPDATE DYNAMIC_TASK SET CRON = ?,IS_ENABLED = ? WHERE DYNAMIC_TASK_ID = ? AND TENANT_ID = ?";
stmt = conn.prepareStatement(sql); stmt = conn.prepareStatement(sql);
stmt.setString(1, dynamicTask.getCronExpression()); stmt.setString(1, dynamicTask.getCronExpression());
stmt.setBoolean(2, dynamicTask.isEnabled()); stmt.setBoolean(2, dynamicTask.isEnabled());
stmt.setInt(3, dynamicTask.getDynamicTaskId()); stmt.setInt(3, dynamicTask.getDynamicTaskId());
stmt.setInt(4, tenantId);
rows = stmt.executeUpdate(); rows = stmt.executeUpdate();
return (rows > 0); return (rows > 0);
} catch (SQLException e) { } catch (SQLException e) {
@ -87,9 +92,8 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
} }
} }
@Override @Override
public void deleteDynamicTask(int dynamicTaskId) throws TaskManagementDAOException { public void deleteDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Request received in DAO Layer to delete dynamic task with the id: " + dynamicTaskId); log.debug("Request received in DAO Layer to delete dynamic task with the id: " + dynamicTaskId);
} }
@ -98,7 +102,7 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
Connection conn = TaskManagementDAOFactory.getConnection(); Connection conn = TaskManagementDAOFactory.getConnection();
try (PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { try (PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
stmt.setInt(1, dynamicTaskId); stmt.setInt(1, dynamicTaskId);
stmt.setInt(2, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()); stmt.setInt(2, tenantId);
stmt.executeUpdate(); stmt.executeUpdate();
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -110,7 +114,7 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
} }
@Override @Override
public DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementDAOException { public DynamicTask getDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException {
DynamicTask dynamicTask = null; DynamicTask dynamicTask = null;
try { try {
Connection conn = TaskManagementDAOFactory.getConnection(); Connection conn = TaskManagementDAOFactory.getConnection();
@ -118,7 +122,7 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
try (PreparedStatement stmt = conn.prepareStatement(sql)) { try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, dynamicTaskId); stmt.setInt(1, dynamicTaskId);
stmt.setInt(2, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()); stmt.setInt(2, tenantId);
try (ResultSet rs = stmt.executeQuery()) { try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) { if (rs.next()) {
dynamicTask = TaskManagementDAOUtil.loadDynamicTask(rs); dynamicTask = TaskManagementDAOUtil.loadDynamicTask(rs);
@ -155,13 +159,35 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
} }
@Override @Override
public List<DynamicTask> getActiveDynamicTasks() throws TaskManagementDAOException { public List<DynamicTask> getAllDynamicTasks(int tenantId) throws TaskManagementDAOException {
List<DynamicTask> dynamicTasks = null; List<DynamicTask> dynamicTasks;
try {
Connection conn = TaskManagementDAOFactory.getConnection();
String sql = "SELECT * FROM DYNAMIC_TASK WHERE TENANT_ID = ?";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, tenantId);
try (ResultSet rs = stmt.executeQuery()) {
dynamicTasks = TaskManagementDAOUtil.loadDynamicTasks(rs);
}
}
} catch (SQLException e) {
String msg = "Error occurred while getting all dynamic task data ";
log.error(msg, e);
throw new TaskManagementDAOException(msg, e);
}
return dynamicTasks;
}
@Override
public List<DynamicTask> getActiveDynamicTasks(int tenantId) throws TaskManagementDAOException {
List<DynamicTask> dynamicTasks;
try { try {
Connection conn = TaskManagementDAOFactory.getConnection(); Connection conn = TaskManagementDAOFactory.getConnection();
String sql = "SELECT * FROM DYNAMIC_TASK WHERE IS_ENABLED = 'true' "; String sql = "SELECT * FROM DYNAMIC_TASK WHERE IS_ENABLED = 'true' AND TENANT_ID = ?";
try (PreparedStatement stmt = conn.prepareStatement(sql)) { try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, tenantId);
try (ResultSet rs = stmt.executeQuery()) { try (ResultSet rs = stmt.executeQuery()) {
dynamicTasks = TaskManagementDAOUtil.loadDynamicTasks(rs); dynamicTasks = TaskManagementDAOUtil.loadDynamicTasks(rs);
} }
@ -173,4 +199,5 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
} }
return dynamicTasks; return dynamicTasks;
} }
} }

@ -38,9 +38,9 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
private static final Log log = LogFactory.getLog(DynamicTaskPropDAOImpl.class); private static final Log log = LogFactory.getLog(DynamicTaskPropDAOImpl.class);
@Override @Override
public void addTaskProperties(int taskId, Map<String, String> properties) public void addTaskProperties(int taskId, Map<String, String> properties, int tenantId)
throws TaskManagementDAOException { throws TaskManagementDAOException {
Connection conn = null; Connection conn;
PreparedStatement stmt = null; PreparedStatement stmt = null;
try { try {
conn = TaskManagementDAOFactory.getConnection(); conn = TaskManagementDAOFactory.getConnection();
@ -51,7 +51,7 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
stmt.setInt(1, taskId); stmt.setInt(1, taskId);
stmt.setString(2, propertyKey); stmt.setString(2, propertyKey);
stmt.setString(3, properties.get(propertyKey)); stmt.setString(3, properties.get(propertyKey));
stmt.setInt(4, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()); stmt.setInt(4, tenantId);
stmt.addBatch(); stmt.addBatch();
} }
stmt.executeBatch(); stmt.executeBatch();
@ -64,17 +64,17 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
} }
} }
public Map<String, String> getDynamicTaskProps(int dynamicTaskId, int tenantId) throws TaskManagementDAOException {
public Map<String, String> getDynamicTaskProps(int dynamicTaskId) throws TaskManagementDAOException { Connection conn;
Connection conn = null;
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet resultSet = null; ResultSet resultSet = null;
Map<String, String> properties; Map<String, String> properties;
try { try {
conn = TaskManagementDAOFactory.getConnection(); conn = TaskManagementDAOFactory.getConnection();
stmt = conn.prepareStatement( stmt = conn.prepareStatement(
"SELECT * FROM DYNAMIC_TASK_PROPERTIES WHERE DYNAMIC_TASK_ID = ?"); "SELECT * FROM DYNAMIC_TASK_PROPERTIES WHERE DYNAMIC_TASK_ID = ? AND TENANT_ID = ?");
stmt.setInt(1, dynamicTaskId); stmt.setInt(1, dynamicTaskId);
stmt.setInt(2, tenantId);
resultSet = stmt.executeQuery(); resultSet = stmt.executeQuery();
properties = new HashMap<>(); properties = new HashMap<>();
while (resultSet.next()) { while (resultSet.next()) {
@ -92,7 +92,7 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
} }
@Override @Override
public void updateDynamicTaskProps(int dynamicTaskId, Map<String, String> properties) public void updateDynamicTaskProps(int dynamicTaskId, Map<String, String> properties, int tenantId)
throws TaskManagementDAOException { throws TaskManagementDAOException {
if (properties.isEmpty()) { if (properties.isEmpty()) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@ -105,12 +105,13 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
try { try {
conn = TaskManagementDAOFactory.getConnection(); conn = TaskManagementDAOFactory.getConnection();
stmt = conn.prepareStatement("UPDATE DYNAMIC_TASK_PROPERTIES SET PROPERTY_VALUE = ? " + stmt = conn.prepareStatement("UPDATE DYNAMIC_TASK_PROPERTIES SET PROPERTY_VALUE = ? " +
"WHERE DYNAMIC_TASK_ID = ? AND PROPERTY_NAME = ?"); "WHERE DYNAMIC_TASK_ID = ? AND PROPERTY_NAME = ? AND TENANT_ID = ?");
for (Map.Entry<String, String> entry : properties.entrySet()) { for (Map.Entry<String, String> entry : properties.entrySet()) {
stmt.setString(1, entry.getValue()); stmt.setString(1, entry.getValue());
stmt.setInt(2, dynamicTaskId); stmt.setInt(2, dynamicTaskId);
stmt.setString(3, entry.getKey()); stmt.setString(3, entry.getKey());
stmt.setInt(4, tenantId);
stmt.addBatch(); stmt.addBatch();
} }
stmt.executeBatch(); stmt.executeBatch();
@ -121,4 +122,5 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
TaskManagementDAOUtil.cleanupResources(stmt, null); TaskManagementDAOUtil.cleanupResources(stmt, null);
} }
} }
} }

@ -17,10 +17,6 @@
*/ */
package io.entgra.device.mgt.core.task.mgt.core.service; package io.entgra.device.mgt.core.task.mgt.core.service;
import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskPropDAO;
import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory;
import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder;
import io.entgra.device.mgt.core.task.mgt.core.util.TaskManagementUtil;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask; import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask;
import io.entgra.device.mgt.core.task.mgt.common.constant.TaskMgtConstants; import io.entgra.device.mgt.core.task.mgt.common.constant.TaskMgtConstants;
@ -30,14 +26,21 @@ import io.entgra.device.mgt.core.task.mgt.common.exception.TaskNotFoundException
import io.entgra.device.mgt.core.task.mgt.common.exception.TransactionManagementException; import io.entgra.device.mgt.core.task.mgt.common.exception.TransactionManagementException;
import io.entgra.device.mgt.core.task.mgt.common.spi.TaskManagementService; import io.entgra.device.mgt.core.task.mgt.common.spi.TaskManagementService;
import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskDAO; import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskDAO;
import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskPropDAO;
import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory;
import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder;
import io.entgra.device.mgt.core.task.mgt.core.util.TaskManagementUtil;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.ntask.common.TaskException; import org.wso2.carbon.ntask.common.TaskException;
import org.wso2.carbon.ntask.core.TaskInfo; import org.wso2.carbon.ntask.core.TaskInfo;
import org.wso2.carbon.ntask.core.TaskManager; import org.wso2.carbon.ntask.core.TaskManager;
import org.wso2.carbon.ntask.core.service.TaskService; import org.wso2.carbon.ntask.core.service.TaskService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -78,42 +81,43 @@ public class TaskManagementServiceImpl implements TaskManagementService {
@Override @Override
public void createTask(DynamicTask dynamicTask) throws TaskManagementException { public void createTask(DynamicTask dynamicTask) throws TaskManagementException {
String taskId; String nTaskName;
int dynamicTaskId;
int serverHashIdx;
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
try { try {
// add into the dynamic task tables // add into the dynamic task tables
TaskManagementDAOFactory.beginTransaction(); TaskManagementDAOFactory.beginTransaction();
int dynamicTaskId = dynamicTaskDAO.addTask(dynamicTask); dynamicTaskId = dynamicTaskDAO.addTask(dynamicTask, tenantId);
dynamicTaskPropDAO.addTaskProperties(dynamicTaskId, dynamicTask.getProperties(), tenantId);
Map<String, String> taskProperties = dynamicTask.getProperties();
dynamicTaskPropDAO.addTaskProperties(dynamicTaskId, taskProperties);
// add into the ntask core
taskId = TaskManagementUtil.generateTaskId(dynamicTaskId);
try { try {
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx(); .getServerCtxInfo().getLocalServerHashIdx();
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); nTaskName = TaskManagementUtil.generateNTaskName(dynamicTaskId, serverHashIdx);
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, taskId);
} catch (HeartBeatManagementException e) { } catch (HeartBeatManagementException e) {
String msg = "Unexpected exception when getting server hash index."; String msg = "Unexpected exception when getting server hash index.";
log.error(msg, e); log.error(msg, e);
throw new TaskManagementException(msg, e); throw new TaskManagementException(msg, e);
} }
if (!isTaskExists(taskId)) { if (isTaskExists(nTaskName)) {
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); String msg = "Task '" + nTaskName + "' is already exists in the ntask core. "
triggerInfo.setCronExpression(dynamicTask.getCronExpression()); + "Hence removing existing task from nTask before adding new one.";
TaskInfo taskInfo = new TaskInfo(taskId, dynamicTask.getTaskClassName(), taskProperties, triggerInfo); log.warn(msg);
taskManager.registerTask(taskInfo); taskManager.deleteTask(nTaskName);
taskManager.scheduleTask(taskId); }
if (!dynamicTask.isEnabled()) {
taskManager.pauseTask(taskId); // add into the ntask core
} Map<String, String> taskProperties = TaskManagementUtil
} else { .populateNTaskProperties(dynamicTask, nTaskName, serverHashIdx);
String msg = "Task '" + taskId + "' is already exists in the ntask core " TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
+ "Hence not creating another task for the same name."; triggerInfo.setCronExpression(dynamicTask.getCronExpression());
log.error(msg); TaskInfo taskInfo = new TaskInfo(nTaskName, dynamicTask.getTaskClassName(), taskProperties, triggerInfo);
taskManager.registerTask(taskInfo);
taskManager.scheduleTask(nTaskName);
if (!dynamicTask.isEnabled()) {
taskManager.pauseTask(nTaskName);
} }
TaskManagementDAOFactory.commitTransaction(); TaskManagementDAOFactory.commitTransaction();
@ -137,19 +141,20 @@ public class TaskManagementServiceImpl implements TaskManagementService {
} }
@Override @Override
public void updateTask(int dynamicTaskId, DynamicTask dynamicTask) throws TaskManagementException public void updateTask(int dynamicTaskId, DynamicTask dynamicTask)
, TaskNotFoundException { throws TaskManagementException, TaskNotFoundException {
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
try { try {
//Update dynamic task table //Update dynamic task table
TaskManagementDAOFactory.beginTransaction(); TaskManagementDAOFactory.beginTransaction();
DynamicTask existingTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId); DynamicTask existingTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId);
if (existingTask != null) { if (existingTask != null) {
existingTask.setEnabled(dynamicTask.isEnabled()); existingTask.setEnabled(dynamicTask.isEnabled());
existingTask.setCronExpression(dynamicTask.getCronExpression()); existingTask.setCronExpression(dynamicTask.getCronExpression());
dynamicTaskDAO.updateDynamicTask(existingTask); dynamicTaskDAO.updateDynamicTask(existingTask, tenantId);
if (!dynamicTask.getProperties().isEmpty()) { if (!dynamicTask.getProperties().isEmpty()) {
dynamicTaskPropDAO.updateDynamicTaskProps(dynamicTaskId, dynamicTask.getProperties()); dynamicTaskPropDAO.updateDynamicTaskProps(dynamicTaskId, dynamicTask.getProperties(), tenantId);
} }
} else { } else {
String msg = "Task '" + dynamicTaskId + "' is not exists in the dynamic task table."; String msg = "Task '" + dynamicTaskId + "' is not exists in the dynamic task table.";
@ -158,12 +163,14 @@ public class TaskManagementServiceImpl implements TaskManagementService {
} }
// Update task in the ntask core // Update task in the ntask core
String taskId = TaskManagementUtil.generateTaskId(existingTask.getDynamicTaskId()); String nTaskName = TaskManagementUtil.generateNTaskName(existingTask.getDynamicTaskId());
if (isTaskExists(taskId)) { if (isTaskExists(nTaskName)) {
TaskInfo taskInfo = taskManager.getTask(taskId); TaskInfo taskInfo = taskManager.getTask(nTaskName);
if (!dynamicTask.getProperties().isEmpty()) {
taskInfo.setProperties(dynamicTask.getProperties()); Map<String, String> taskProperties = TaskManagementUtil
} .populateNTaskProperties(dynamicTask, nTaskName);
taskInfo.setProperties(taskProperties);
TaskInfo.TriggerInfo triggerInfo; TaskInfo.TriggerInfo triggerInfo;
if (taskInfo.getTriggerInfo() == null) { if (taskInfo.getTriggerInfo() == null) {
triggerInfo = new TaskInfo.TriggerInfo(); triggerInfo = new TaskInfo.TriggerInfo();
@ -173,9 +180,9 @@ public class TaskManagementServiceImpl implements TaskManagementService {
triggerInfo.setCronExpression(dynamicTask.getCronExpression()); triggerInfo.setCronExpression(dynamicTask.getCronExpression());
taskInfo.setTriggerInfo(triggerInfo); taskInfo.setTriggerInfo(triggerInfo);
taskManager.registerTask(taskInfo); taskManager.registerTask(taskInfo);
taskManager.rescheduleTask(taskId); taskManager.rescheduleTask(nTaskName);
} else { } else {
String msg = "Task '" + taskId + "' is not exists in the n task core " String msg = "Task '" + nTaskName + "' is not exists in the n task core "
+ "Hence cannot update the task."; + "Hence cannot update the task.";
log.error(msg); log.error(msg);
} }
@ -200,16 +207,17 @@ public class TaskManagementServiceImpl implements TaskManagementService {
} }
@Override @Override
public void toggleTask(int dynamicTaskId, boolean isEnabled) throws TaskManagementException public void toggleTask(int dynamicTaskId, boolean isEnabled)
, TaskNotFoundException { throws TaskManagementException, TaskNotFoundException {
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
try { try {
//update dynamic task table //update dynamic task table
TaskManagementDAOFactory.beginTransaction(); TaskManagementDAOFactory.beginTransaction();
DynamicTask existingTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId); DynamicTask existingTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId);
if (existingTask != null) { if (existingTask != null) {
existingTask.setEnabled(isEnabled); existingTask.setEnabled(isEnabled);
dynamicTaskDAO.updateDynamicTask(existingTask); dynamicTaskDAO.updateDynamicTask(existingTask, tenantId);
} else { } else {
String msg = "Task '" + dynamicTaskId + "' is not exists."; String msg = "Task '" + dynamicTaskId + "' is not exists.";
log.error(msg); log.error(msg);
@ -217,15 +225,15 @@ public class TaskManagementServiceImpl implements TaskManagementService {
} }
// Update task in the ntask core // Update task in the ntask core
String taskId = TaskManagementUtil.generateTaskId(existingTask.getDynamicTaskId()); String taskName = TaskManagementUtil.generateNTaskName(existingTask.getDynamicTaskId());
if (isTaskExists(taskId)) { if (isTaskExists(taskName)) {
if (isEnabled) { if (isEnabled) {
taskManager.resumeTask(taskId); taskManager.resumeTask(taskName);
} else { } else {
taskManager.pauseTask(taskId); taskManager.pauseTask(taskName);
} }
} else { } else {
String msg = "Task '" + taskId + "' is not exists in the ntask core " String msg = "Task '" + taskName + "' is not exists in the ntask core "
+ "Hence cannot toggle the task in the ntask."; + "Hence cannot toggle the task in the ntask.";
log.error(msg); log.error(msg);
} }
@ -251,22 +259,23 @@ public class TaskManagementServiceImpl implements TaskManagementService {
@Override @Override
public void deleteTask(int dynamicTaskId) throws TaskManagementException, TaskNotFoundException { public void deleteTask(int dynamicTaskId) throws TaskManagementException, TaskNotFoundException {
// delete task from dynamic task table // delete task from dynamic task table
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
try { try {
TaskManagementDAOFactory.beginTransaction(); TaskManagementDAOFactory.beginTransaction();
DynamicTask existingTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId); DynamicTask existingTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId);
if (existingTask != null) { if (existingTask != null) {
dynamicTaskDAO.deleteDynamicTask(dynamicTaskId); dynamicTaskDAO.deleteDynamicTask(dynamicTaskId, tenantId);
} else { } else {
String msg = "Task '" + dynamicTaskId + "' is not exists."; String msg = "Task '" + dynamicTaskId + "' is not exists.";
log.error(msg); log.error(msg);
throw new TaskNotFoundException(msg); throw new TaskNotFoundException(msg);
} }
String taskId = TaskManagementUtil.generateTaskId(existingTask.getDynamicTaskId()); String taskName = TaskManagementUtil.generateNTaskName(existingTask.getDynamicTaskId());
if (isTaskExists(taskId)) { if (isTaskExists(taskName)) {
taskManager.deleteTask(taskId); taskManager.deleteTask(taskName);
} else { } else {
String msg = "Task '" + taskId + "' is not exists in the ntask core " String msg = "Task '" + taskName + "' is not exists in the ntask core "
+ "Hence cannot delete from the ntask core."; + "Hence cannot delete from the ntask core.";
log.error(msg); log.error(msg);
} }
@ -292,22 +301,21 @@ public class TaskManagementServiceImpl implements TaskManagementService {
@Override @Override
public List<DynamicTask> getAllDynamicTasks() throws TaskManagementException { public List<DynamicTask> getAllDynamicTasks() throws TaskManagementException {
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
List<DynamicTask> dynamicTasks; List<DynamicTask> dynamicTasks;
try { try {
if (log.isDebugEnabled()) { if (log.isTraceEnabled()) {
log.debug("Fetching the details of all dynamic tasks"); log.trace("Fetching the details of all dynamic tasks");
} }
TaskManagementDAOFactory.beginTransaction(); TaskManagementDAOFactory.openConnection();
dynamicTasks = dynamicTaskDAO.getAllDynamicTasks(); dynamicTasks = dynamicTaskDAO.getAllDynamicTasks(tenantId);
if (dynamicTasks != null) { if (dynamicTasks != null) {
for (DynamicTask dynamicTask : dynamicTasks) { for (DynamicTask dynamicTask : dynamicTasks) {
dynamicTask.setProperties(dynamicTaskPropDAO dynamicTask.setProperties(dynamicTaskPropDAO
.getDynamicTaskProps(dynamicTask.getDynamicTaskId())); .getDynamicTaskProps(dynamicTask.getDynamicTaskId(), tenantId));
} }
} }
TaskManagementDAOFactory.commitTransaction();
} catch (TaskManagementDAOException e) { } catch (TaskManagementDAOException e) {
TaskManagementDAOFactory.rollbackTransaction();
String msg = "Error occurred while fetching all dynamic tasks"; String msg = "Error occurred while fetching all dynamic tasks";
log.error(msg, e); log.error(msg, e);
throw new TaskManagementException(msg, e); throw new TaskManagementException(msg, e);
@ -322,20 +330,62 @@ public class TaskManagementServiceImpl implements TaskManagementService {
} }
@Override @Override
public DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementException { public Map<Integer, List<DynamicTask>> getDynamicTasksForAllTenants() throws TaskManagementException {
List<DynamicTask> dynamicTasks;
try {
if (log.isTraceEnabled()) {
log.trace("Fetching the details of dynamic tasks for all tenants");
}
TaskManagementDAOFactory.openConnection();
dynamicTasks = dynamicTaskDAO.getAllDynamicTasks();
if (dynamicTasks != null) {
for (DynamicTask dynamicTask : dynamicTasks) {
dynamicTask.setProperties(dynamicTaskPropDAO
.getDynamicTaskProps(dynamicTask.getDynamicTaskId(), dynamicTask.getTenantId()));
}
}
} catch (TaskManagementDAOException e) {
String msg = "Error occurred while fetching all dynamic tasks";
log.error(msg, e);
throw new TaskManagementException(msg, e);
} catch (TransactionManagementException e) {
String msg = "Failed to start/open transaction to get all dynamic tasks";
log.error(msg, e);
throw new TaskManagementException(msg, e);
} finally {
TaskManagementDAOFactory.closeConnection();
}
Map<Integer, List<DynamicTask>> tenantedDynamicTasks = new HashMap<>();
List<DynamicTask> dts;
if (dynamicTasks != null) {
for (DynamicTask dt : dynamicTasks) {
if (tenantedDynamicTasks.containsKey(dt.getTenantId())) {
dts = tenantedDynamicTasks.get(dt.getTenantId());
} else {
dts = new ArrayList<>();
}
dts.add(dt);
tenantedDynamicTasks.put(dt.getTenantId(), dts);
}
}
return tenantedDynamicTasks;
}
@Override
public DynamicTask getDynamicTask(int dynamicTaskId) throws TaskManagementException {
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
DynamicTask dynamicTask; DynamicTask dynamicTask;
try { try {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Fetching the details of dynamic task '" + dynamicTaskId + "'"); log.debug("Fetching the details of dynamic task '" + dynamicTaskId + "'");
} }
TaskManagementDAOFactory.beginTransaction(); TaskManagementDAOFactory.openConnection();
dynamicTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId); dynamicTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId);
if (dynamicTask != null) { if (dynamicTask != null) {
dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId())); dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId(),
tenantId));
} }
TaskManagementDAOFactory.commitTransaction();
} catch (TaskManagementDAOException e) { } catch (TaskManagementDAOException e) {
TaskManagementDAOFactory.rollbackTransaction();
String msg = "Error occurred while fetching dynamic task '" + dynamicTaskId + "'"; String msg = "Error occurred while fetching dynamic task '" + dynamicTaskId + "'";
log.error(msg, e); log.error(msg, e);
throw new TaskManagementException(msg, e); throw new TaskManagementException(msg, e);
@ -351,21 +401,21 @@ public class TaskManagementServiceImpl implements TaskManagementService {
@Override @Override
public List<DynamicTask> getActiveDynamicTasks() throws TaskManagementException { public List<DynamicTask> getActiveDynamicTasks() throws TaskManagementException {
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
List<DynamicTask> dynamicTasks; List<DynamicTask> dynamicTasks;
try { try {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Fetching the details of all active dynamic tasks"); log.debug("Fetching the details of all active dynamic tasks");
} }
TaskManagementDAOFactory.beginTransaction(); TaskManagementDAOFactory.openConnection();
dynamicTasks = dynamicTaskDAO.getActiveDynamicTasks(); dynamicTasks = dynamicTaskDAO.getActiveDynamicTasks(tenantId);
if (dynamicTasks != null) { if (dynamicTasks != null) {
for (DynamicTask dynamicTask : dynamicTasks) { for (DynamicTask dynamicTask : dynamicTasks) {
dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId())); dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId(),
tenantId));
} }
} }
TaskManagementDAOFactory.commitTransaction();
} catch (TaskManagementDAOException e) { } catch (TaskManagementDAOException e) {
TaskManagementDAOFactory.rollbackTransaction();
String msg = "Error occurred while fetching all active dynamic tasks"; String msg = "Error occurred while fetching all active dynamic tasks";
log.error(msg, e); log.error(msg, e);
throw new TaskManagementException(msg, e); throw new TaskManagementException(msg, e);
@ -380,18 +430,19 @@ public class TaskManagementServiceImpl implements TaskManagementService {
} }
// check whether task exist in the ntask core // check whether task exist in the ntask core
private boolean isTaskExists(String taskId) throws TaskManagementException, TaskException { private boolean isTaskExists(String taskName) throws TaskManagementException, TaskException {
if (StringUtils.isEmpty(taskId)) { if (StringUtils.isEmpty(taskName)) {
String msg = "Task ID must not be null or empty."; String msg = "Task Name must not be null or empty.";
log.error(msg); log.error(msg);
throw new TaskManagementException(msg); throw new TaskManagementException(msg);
} }
List<TaskInfo> tasks = taskManager.getAllTasks(); List<TaskInfo> tasks = taskManager.getAllTasks();
for (TaskInfo t : tasks) { for (TaskInfo t : tasks) {
if (taskId.equals(t.getName())) { if (taskName.equals(t.getName())) {
return true; return true;
} }
} }
return false; return false;
} }
} }

@ -17,20 +17,21 @@
*/ */
package io.entgra.device.mgt.core.task.mgt.core.util; package io.entgra.device.mgt.core.task.mgt.core.util;
import com.google.gson.Gson;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask;
import io.entgra.device.mgt.core.task.mgt.common.constant.TaskMgtConstants; import io.entgra.device.mgt.core.task.mgt.common.constant.TaskMgtConstants;
import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementException; import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementException;
import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder; import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.w3c.dom.Document; import org.w3c.dom.Document;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import javax.xml.XMLConstants; import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File; import java.io.File;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
@ -55,11 +56,11 @@ public class TaskManagementUtil {
} }
} }
public static String generateTaskId(int dynamicTaskId) throws TaskManagementException { public static String generateNTaskName(int dynamicTaskId) throws TaskManagementException {
try { try {
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx(); .getServerCtxInfo().getLocalServerHashIdx();
return generateTaskId(dynamicTaskId, serverHashIdx); return generateNTaskName(dynamicTaskId, serverHashIdx);
} catch (HeartBeatManagementException e) { } catch (HeartBeatManagementException e) {
String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId; String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId;
log.error(msg, e); log.error(msg, e);
@ -67,18 +68,33 @@ public class TaskManagementUtil {
} }
} }
public static String generateTaskId(int dynamicTaskId, int serverHashIdx) { public static String generateNTaskName(int dynamicTaskId, int serverHashIdx) {
return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId
+ TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx; + TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx;
} }
public static String generateTaskPropsMD5(Map<String, String> taskProperties) { public static Map<String, String> populateNTaskProperties(DynamicTask dynamicTask,
taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP); String nTaskName) throws TaskManagementException {
taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX); try {
taskProperties.remove(TaskMgtConstants.Task.LOCAL_TASK_NAME); int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
Gson gson = new Gson(); .getServerCtxInfo().getLocalServerHashIdx();
String json = gson.toJson(taskProperties); return populateNTaskProperties(dynamicTask, nTaskName, serverHashIdx);
return DigestUtils.md5Hex(json); } catch (HeartBeatManagementException e) {
String msg = "Failed to populate nTask properties a dynamic task " + dynamicTask.getDynamicTaskId();
log.error(msg, e);
throw new TaskManagementException(msg, e);
}
}
public static Map<String, String> populateNTaskProperties(DynamicTask dynamicTask,
String nTaskName, int serverHashIdx) {
Map<String, String> taskProperties = new HashMap<>();
taskProperties.put(TaskMgtConstants.Task.DYNAMIC_TASK_ID, String.valueOf(dynamicTask.getDynamicTaskId()));
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, nTaskName);
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
taskProperties.put(TaskMgtConstants.Task.TENANT_ID_PROP,
String.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
return taskProperties;
} }
} }

@ -38,7 +38,6 @@ import org.wso2.carbon.user.core.service.RealmService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -69,47 +68,38 @@ public class IoTSStartupHandler implements ServerStartupObserver {
private void compareTasks() { private void compareTasks() {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Comparing Tasks from carbon nTask manager and entgra task manager"); log.debug("Comparing Tasks from carbon nTask manager and Entgra task manager.");
} }
TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService(); TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService();
if (nTaskService == null) { if (nTaskService == null) {
String msg = "Unable to load TaskService from the carbon nTask core"; String msg = "Unable to load TaskService from the carbon nTask core.";
log.error(msg); log.error(msg);
return; return;
} }
try { try {
List<DynamicTask> dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService() Map<Integer, List<DynamicTask>> tenantedDynamicTasks = TaskWatcherDataHolder.getInstance()
.getAllDynamicTasks(); .getTaskManagementService().getDynamicTasksForAllTenants();
scheduleMissingTasks(nTaskService, dynamicTasks); scheduleMissingTasks(nTaskService, tenantedDynamicTasks);
deleteObsoleteTasks(nTaskService, dynamicTasks); deleteObsoleteTasks(nTaskService, tenantedDynamicTasks);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Task Comparison Completed and all tasks in current node are updated"); log.debug("Task Comparison Completed and all tasks in current node are updated.");
} }
} catch (TaskException e) { } catch (TaskException e) {
String msg = "Error occurred while accessing carbon nTask manager."; String msg = "Error occurred while accessing carbon nTask manager.";
log.error(msg, e); log.error(msg, e);
} catch (TaskManagementException e) { } catch (TaskManagementException e) {
String msg = "Error occurred while retrieving all active tasks from entgra task manager"; String msg = "Error occurred while retrieving all active tasks from Entgra task manager.";
log.error(msg, e); log.error(msg, e);
} }
} }
private static void scheduleMissingTasks(TaskService nTaskService, List<DynamicTask> dynamicTasks) private static void scheduleMissingTasks(TaskService nTaskService, Map<Integer,
List<DynamicTask>> tenantedDynamicTasks)
throws TaskException, TaskManagementException { throws TaskException, TaskManagementException {
Map<Integer, List<DynamicTask>> tenantedDynamicTasks = new HashMap<>();
List<DynamicTask> dts;
for (DynamicTask dt : dynamicTasks) {
if (tenantedDynamicTasks.containsKey(dt.getTenantId())) {
dts = tenantedDynamicTasks.get(dt.getTenantId());
} else {
dts = new ArrayList<>();
}
dts.add(dt);
tenantedDynamicTasks.put(dt.getTenantId(), dts);
}
TaskManager taskManager; TaskManager taskManager;
for (Integer tenantId : tenantedDynamicTasks.keySet()) { for (Integer tenantId : tenantedDynamicTasks.keySet()) {
if (tenantId == -1) { if (tenantId == -1) {
@ -126,36 +116,56 @@ public class IoTSStartupHandler implements ServerStartupObserver {
List<TaskInfo> tasks = taskManager.getAllTasks(); List<TaskInfo> tasks = taskManager.getAllTasks();
// add or update task into nTask core // add or update task into nTask core
for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) { for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) {
String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); int serverHashIdx;
try {
serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx();
} catch (HeartBeatManagementException e) {
String msg = "Failed to get server hash index for dynamic task " + dt.getDynamicTaskId();
log.error(msg, e);
throw new TaskManagementException(msg, e);
}
String nTaskName = TaskManagementUtil.generateNTaskName(dt.getDynamicTaskId(), serverHashIdx);
boolean isExist = false; boolean isExist = false;
for (TaskInfo taskInfo : tasks) { for (TaskInfo taskInfo : tasks) {
if (taskInfo.getName().equals(generatedTaskId)) { if (taskInfo.getName().equals(nTaskName)) {
isExist = true;
TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo(); TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo();
String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(dt.getProperties()); if (taskInfo.getProperties() == null) {
String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties()); String msg = "Task properties not found for task " + nTaskName
if (!triggerInfo.getCronExpression().equals(dt.getCronExpression()) + ". Therefore deleting the nTask schedule.";
|| !dynamicTaskPropMD5.equals(existingTaskPropMD5)) { log.warn(msg);
taskManager.deleteTask(nTaskName);
break;
}
isExist = true;
if (!triggerInfo.getCronExpression().equals(dt.getCronExpression())) {
triggerInfo.setCronExpression(dt.getCronExpression()); triggerInfo.setCronExpression(dt.getCronExpression());
taskInfo.setTriggerInfo(triggerInfo); taskInfo.setTriggerInfo(triggerInfo);
taskInfo.setProperties(populateTaskProperties(tenantId, generatedTaskId, dt.getProperties())); taskInfo.setProperties(TaskManagementUtil
.populateNTaskProperties(dt, taskInfo.getName(), serverHashIdx));
taskManager.registerTask(taskInfo); taskManager.registerTask(taskInfo);
taskManager.rescheduleTask(generatedTaskId); taskManager.rescheduleTask(nTaskName);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table"); log.debug("Task - '" + nTaskName
+ "' updated according to the dynamic task table.");
} }
} }
if (dt.isEnabled() if (dt.isEnabled()
&& taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) { && taskManager.getTaskState(nTaskName) == TaskManager.TaskState.PAUSED) {
taskManager.resumeTask(generatedTaskId); taskManager.resumeTask(nTaskName);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table"); log.debug("Task - '" + nTaskName
+ "' enabled according to the dynamic task table.");
} }
} else if (!dt.isEnabled() } else if (!dt.isEnabled()
&& taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) { && taskManager.getTaskState(nTaskName) != TaskManager.TaskState.PAUSED) {
taskManager.pauseTask(generatedTaskId); taskManager.pauseTask(nTaskName);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table"); log.debug("Task - '" + nTaskName
+ "' disabled according to the dynamic task table.");
} }
} }
break; break;
@ -164,12 +174,12 @@ public class IoTSStartupHandler implements ServerStartupObserver {
if (!isExist) { if (!isExist) {
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
triggerInfo.setCronExpression(dt.getCronExpression()); triggerInfo.setCronExpression(dt.getCronExpression());
TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), TaskInfo taskInfo = new TaskInfo(nTaskName, dt.getTaskClassName(), TaskManagementUtil
populateTaskProperties(tenantId, generatedTaskId, dt.getProperties()), triggerInfo); .populateNTaskProperties(dt, nTaskName, serverHashIdx), triggerInfo);
taskManager.registerTask(taskInfo); taskManager.registerTask(taskInfo);
taskManager.scheduleTask(generatedTaskId); taskManager.scheduleTask(nTaskName);
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table"); log.debug("New task -'" + nTaskName + "' created according to the dynamic task table.");
} }
} }
} }
@ -177,24 +187,8 @@ public class IoTSStartupHandler implements ServerStartupObserver {
} }
} }
private static Map<String, String> populateTaskProperties(int tenantId, String generatedTaskId, private static void deleteObsoleteTasks(TaskService nTaskService,
Map<String, String> taskProperties) Map<Integer, List<DynamicTask>> tenantedDynamicTasks)
throws TaskManagementException {
try {
int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx();
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId);
taskProperties.put(TaskMgtConstants.Task.TENANT_ID_PROP, String.valueOf(tenantId));
return taskProperties;
} catch (HeartBeatManagementException e) {
String msg = "Unexpected exception when getting server hash index.";
log.error(msg, e);
throw new TaskManagementException(msg, e);
}
}
private static void deleteObsoleteTasks(TaskService nTaskService, List<DynamicTask> dynamicTasks)
throws TaskManagementException, TaskException { throws TaskManagementException, TaskException {
List<Tenant> tenants = new ArrayList<>(); List<Tenant> tenants = new ArrayList<>();
@ -224,6 +218,13 @@ public class IoTSStartupHandler implements ServerStartupObserver {
} }
for (Tenant tenant : tenants) { for (Tenant tenant : tenants) {
if (tenantedDynamicTasks.get(tenant.getId()) == null) {
if (log.isTraceEnabled()) {
log.trace("Dynamic tasks not running for tenant: [" + tenant.getId() + "] "
+ tenant.getDomain());
}
continue;
}
PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true);
if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
@ -234,10 +235,10 @@ public class IoTSStartupHandler implements ServerStartupObserver {
// Remove deleted items from the nTask core // Remove deleted items from the nTask core
for (TaskInfo taskInfo : tasks) { for (TaskInfo taskInfo : tasks) {
boolean isExist = false; boolean isExist = false;
for (DynamicTask dt : dynamicTasks) { for (DynamicTask dt : tenantedDynamicTasks.get(tenant.getId())) {
for (int hid : hashIds) { for (int hid : hashIds) {
if (tenant.getId() == dt.getTenantId() && if (tenant.getId() == dt.getTenantId() &&
taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId(), hid))) { taskInfo.getName().equals(TaskManagementUtil.generateNTaskName(dt.getDynamicTaskId(), hid))) {
isExist = true; isExist = true;
break; break;
} }

@ -782,9 +782,9 @@ CREATE TABLE IF NOT EXISTS DM_EXT_PERMISSION_MAPPING (
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
DYNAMIC_TASK_ID INTEGER AUTO_INCREMENT NOT NULL, DYNAMIC_TASK_ID INTEGER AUTO_INCREMENT NOT NULL,
NAME VARCHAR(300) DEFAULT NULL , NAME VARCHAR(300) DEFAULT NULL ,
CRON VARCHAR(8000) DEFAULT NULL, CRON VARCHAR(100) DEFAULT NULL,
IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE, IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL, TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL,
TENANT_ID INTEGER DEFAULT 0, TENANT_ID INTEGER DEFAULT 0,
PRIMARY KEY (DYNAMIC_TASK_ID) PRIMARY KEY (DYNAMIC_TASK_ID)
); );
@ -792,8 +792,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES (
DYNAMIC_TASK_ID INTEGER NOT NULL, DYNAMIC_TASK_ID INTEGER NOT NULL,
PROPERTY_NAME VARCHAR(100) DEFAULT 0, PROPERTY_NAME VARCHAR(100) DEFAULT 0,
PROPERTY_VALUE VARCHAR(100) DEFAULT NULL, PROPERTY_VALUE VARCHAR(8000) DEFAULT NULL,
TENANT_ID VARCHAR(100), TENANT_ID INTEGER DEFAULT 0,
PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE

@ -852,10 +852,10 @@ CREATE TABLE DM_GEOFENCE_EVENT_MAPPING (
IF NOT EXISTS (SELECT * FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[DYNAMIC_TASK]') AND TYPE IN (N'U')) IF NOT EXISTS (SELECT * FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[DYNAMIC_TASK]') AND TYPE IN (N'U'))
CREATE TABLE DYNAMIC_TASK ( CREATE TABLE DYNAMIC_TASK (
DYNAMIC_TASK_ID INTEGER IDENTITY(1,1) NOT NULL, DYNAMIC_TASK_ID INTEGER IDENTITY(1,1) NOT NULL,
NAME VARCHAR(255) DEFAULT NULL , NAME VARCHAR(300) DEFAULT NULL ,
CRON VARCHAR(8000) DEFAULT NULL, CRON VARCHAR(100) DEFAULT NULL,
IS_ENABLED BIT NOT NULL DEFAULT 0, IS_ENABLED BIT NOT NULL DEFAULT 0,
TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL, TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL,
TENANT_ID INTEGER DEFAULT 0, TENANT_ID INTEGER DEFAULT 0,
PRIMARY KEY (DYNAMIC_TASK_ID) PRIMARY KEY (DYNAMIC_TASK_ID)
); );
@ -864,8 +864,8 @@ IF NOT EXISTS (SELECT * FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[D
CREATE TABLE DYNAMIC_TASK_PROPERTIES ( CREATE TABLE DYNAMIC_TASK_PROPERTIES (
DYNAMIC_TASK_ID INTEGER NOT NULL, DYNAMIC_TASK_ID INTEGER NOT NULL,
PROPERTY_NAME VARCHAR(100) DEFAULT 0, PROPERTY_NAME VARCHAR(100) DEFAULT 0,
PROPERTY_VALUE VARCHAR(100) DEFAULT NULL, PROPERTY_VALUE VARCHAR(8000) DEFAULT NULL,
TENANT_ID VARCHAR(100), TENANT_ID INTEGER DEFAULT 0,
PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE

@ -853,9 +853,9 @@ CREATE TABLE IF NOT EXISTS DM_EXT_PERMISSION_MAPPING (
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
DYNAMIC_TASK_ID INTEGER AUTO_INCREMENT NOT NULL, DYNAMIC_TASK_ID INTEGER AUTO_INCREMENT NOT NULL,
NAME VARCHAR(300) DEFAULT NULL , NAME VARCHAR(300) DEFAULT NULL ,
CRON VARCHAR(8000) DEFAULT NULL, CRON VARCHAR(100) DEFAULT NULL,
IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE, IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL, TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL,
TENANT_ID INTEGER DEFAULT 0, TENANT_ID INTEGER DEFAULT 0,
PRIMARY KEY (DYNAMIC_TASK_ID) PRIMARY KEY (DYNAMIC_TASK_ID)
) ENGINE=InnoDB; ) ENGINE=InnoDB;
@ -863,8 +863,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES (
DYNAMIC_TASK_ID INTEGER NOT NULL, DYNAMIC_TASK_ID INTEGER NOT NULL,
PROPERTY_NAME VARCHAR(100) DEFAULT 0, PROPERTY_NAME VARCHAR(100) DEFAULT 0,
PROPERTY_VALUE VARCHAR(100) DEFAULT NULL, PROPERTY_VALUE TEXT DEFAULT NULL,
TENANT_ID VARCHAR(100), TENANT_ID INTEGER DEFAULT 0,
PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE

@ -1127,9 +1127,9 @@ CREATE TABLE DM_GEOFENCE (
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
DYNAMIC_TASK_ID NUMBER(10) NOT NULL, DYNAMIC_TASK_ID NUMBER(10) NOT NULL,
NAME VARCHAR2(300) DEFAULT NULL , NAME VARCHAR2(300) DEFAULT NULL ,
CRON VARCHAR2(8000) DEFAULT NULL, CRON VARCHAR2(100) DEFAULT NULL,
IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE, IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TASK_CLASS_NAME VARCHAR2(8000) DEFAULT NULL, TASK_CLASS_NAME VARCHAR2(1000) DEFAULT NULL,
TENANT_ID INTEGER DEFAULT 0, TENANT_ID INTEGER DEFAULT 0,
CONSTRAINT PK_DYNAMIC_TASK PRIMARY KEY (DYNAMIC_TASK_ID) CONSTRAINT PK_DYNAMIC_TASK PRIMARY KEY (DYNAMIC_TASK_ID)
) ENGINE=InnoDB; ) ENGINE=InnoDB;
@ -1137,8 +1137,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES (
DYNAMIC_TASK_ID INTEGER NOT NULL, DYNAMIC_TASK_ID INTEGER NOT NULL,
PROPERTY_NAME VARCHAR2(100) DEFAULT 0, PROPERTY_NAME VARCHAR2(100) DEFAULT 0,
PROPERTY_VALUE VARCHAR2(100) DEFAULT NULL, PROPERTY_VALUE VARCHAR2(8000) DEFAULT NULL,
TENANT_ID VARCHAR2(100), TENANT_ID INTEGER DEFAULT 0,
CONSTRAINT PK_DYNAMIC_TASK_PROPERTIES PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), CONSTRAINT PK_DYNAMIC_TASK_PROPERTIES PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE

@ -772,9 +772,9 @@ CREATE TABLE IF NOT EXISTS DM_GEOFENCE (
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
DYNAMIC_TASK_ID INTEGER DEFAULT NEXTVAL ('DYNAMIC_TASK_seq') NOT NULL, DYNAMIC_TASK_ID INTEGER DEFAULT NEXTVAL ('DYNAMIC_TASK_seq') NOT NULL,
NAME VARCHAR(300) DEFAULT NULL , NAME VARCHAR(300) DEFAULT NULL ,
CRON VARCHAR(8000) DEFAULT NULL, CRON VARCHAR(100) DEFAULT NULL,
IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE, IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL, TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL,
TENANT_ID INTEGER DEFAULT 0, TENANT_ID INTEGER DEFAULT 0,
PRIMARY KEY (DYNAMIC_TASK_ID) PRIMARY KEY (DYNAMIC_TASK_ID)
) ENGINE=InnoDB; ) ENGINE=InnoDB;
@ -782,8 +782,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES (
DYNAMIC_TASK_ID INTEGER NOT NULL, DYNAMIC_TASK_ID INTEGER NOT NULL,
PROPERTY_NAME VARCHAR(100) DEFAULT 0, PROPERTY_NAME VARCHAR(100) DEFAULT 0,
PROPERTY_VALUE VARCHAR(100) DEFAULT NULL, PROPERTY_VALUE TEXT DEFAULT NULL,
TENANT_ID VARCHAR(100), TENANT_ID INTEGER DEFAULT 0,
PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE

@ -894,6 +894,12 @@
<version>${jaxb.api.version}</version> <version>${jaxb.api.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.wso2.orbit.javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.1.wso2v1</version>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.axis2.transport</groupId> <groupId>org.apache.axis2.transport</groupId>

Loading…
Cancel
Save