From 4045e39a76137da68c636098a6bc5c1fbef89bb5 Mon Sep 17 00:00:00 2001 From: Charitha Goonetilleke Date: Wed, 21 Feb 2024 23:26:23 +0530 Subject: [PATCH] Fix inconsistencies in dynamic and random task execution --- .../task/impl/OperationTimeoutTask.java | 115 +++++----- .../mgt/core/task/impl/ArchivalTask.java | 14 +- .../task/impl/ArchivedDataDeletionTask.java | 15 +- .../task/impl/DeviceDetailsRetrieverTask.java | 1 + .../impl/DynamicPartitionedScheduleTask.java | 17 +- .../impl/RandomlyAssignedScheduleTask.java | 13 +- .../mgt/core/enforcement/DelegationTask.java | 2 +- .../policy/mgt/core/task/MonitoringTask.java | 2 +- .../mgt/common/constant/TaskMgtConstants.java | 1 + .../mgt/common/spi/TaskManagementService.java | 5 +- .../task/mgt/core/dao/DynamicTaskDAO.java | 12 +- .../task/mgt/core/dao/DynamicTaskPropDAO.java | 7 +- .../dao/common/TaskManagementDAOFactory.java | 10 +- .../mgt/core/dao/impl/DynamicTaskDAOImpl.java | 61 ++++-- .../core/dao/impl/DynamicTaskPropDAOImpl.java | 20 +- .../service/TaskManagementServiceImpl.java | 206 +++++++++++------- .../mgt/core/util/TaskManagementUtil.java | 40 +++- .../task/mgt/watcher/IoTSStartupHandler.java | 123 +++++------ .../src/main/resources/dbscripts/cdm/h2.sql | 8 +- .../main/resources/dbscripts/cdm/mssql.sql | 10 +- .../main/resources/dbscripts/cdm/mysql.sql | 8 +- .../main/resources/dbscripts/cdm/oracle.sql | 8 +- .../resources/dbscripts/cdm/postgresql.sql | 8 +- 23 files changed, 420 insertions(+), 286 deletions(-) diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java index 157795aa5a..335e0b56d6 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java @@ -18,9 +18,6 @@ package io.entgra.device.mgt.core.device.mgt.core.operation.timeout.task.impl; import com.google.gson.Gson; -import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException; import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.Activity; import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.ActivityStatus; @@ -29,75 +26,85 @@ import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.OperationManage import io.entgra.device.mgt.core.device.mgt.core.config.operation.timeout.OperationTimeout; import io.entgra.device.mgt.core.device.mgt.core.dto.DeviceType; import io.entgra.device.mgt.core.device.mgt.core.internal.DeviceManagementDataHolder; -import io.entgra.device.mgt.core.device.mgt.core.task.impl.DynamicPartitionedScheduleTask; +import io.entgra.device.mgt.core.device.mgt.core.task.impl.RandomlyAssignedScheduleTask; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.util.ArrayList; import java.util.List; +import java.util.Map; -public class OperationTimeoutTask extends DynamicPartitionedScheduleTask { +public class OperationTimeoutTask extends RandomlyAssignedScheduleTask { private static final Log log = LogFactory.getLog(OperationTimeoutTask.class); + public static final String OPERATION_TIMEOUT_TASK = "OPERATION_TIMEOUT_TASK"; + private Map properties; + + @Override + public final void setProperties(Map properties) { + this.properties = properties; + } + + public final String getProperty(String name) { + if (properties == null) { + return null; + } + return properties.get(name); + } + @Override protected void setup() { } @Override - protected void executeDynamicTask() { - if (isQualifiedToExecuteTask()) { // this task will run only in one node when the deployment has multiple nodes - String operationTimeoutTaskConfigStr = getProperty( - OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG); - Gson gson = new Gson(); - OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class); - try { - long timeMillis = System.currentTimeMillis() - (long) operationTimeoutConfig.getTimeout(); - List deviceTypes = new ArrayList<>(); - if (operationTimeoutConfig.getDeviceTypes().size() == 1 && - "ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) { - try { - List deviceTypeList = DeviceManagementDataHolder.getInstance() - .getDeviceManagementProvider().getDeviceTypes(); - for (DeviceType deviceType : deviceTypeList) { - deviceTypes.add(deviceType.getName()); - } - } catch (DeviceManagementException e) { - log.error("Error occurred while reading device types", e); + public String getTaskName() { + return OPERATION_TIMEOUT_TASK; + } + + @Override + protected void executeRandomlyAssignedTask() { +// this task will run only in one node when the deployment has multiple nodes + String operationTimeoutTaskConfigStr = getProperty( + OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG); + Gson gson = new Gson(); + OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class); + try { + long timeMillis = System.currentTimeMillis() - (long) operationTimeoutConfig.getTimeout(); + List deviceTypes = new ArrayList<>(); + if (operationTimeoutConfig.getDeviceTypes().size() == 1 && + "ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) { + try { + List deviceTypeList = DeviceManagementDataHolder.getInstance() + .getDeviceManagementProvider().getDeviceTypes(); + for (DeviceType deviceType : deviceTypeList) { + deviceTypes.add(deviceType.getName()); } - } else { - deviceTypes = operationTimeoutConfig.getDeviceTypes(); + } catch (DeviceManagementException e) { + log.error("Error occurred while reading device types", e); } - List activities = DeviceManagementDataHolder.getInstance().getOperationManager() - .getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis, - operationTimeoutConfig.getInitialStatus()); - for (Activity activity : activities) { - for (ActivityStatus activityStatus : activity.getActivityStatus()) { - String operationId = activity.getActivityId().replace("ACTIVITY_", ""); - Operation operation = DeviceManagementDataHolder.getInstance().getOperationManager() - .getOperation(Integer.parseInt(operationId)); - operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus())); - DeviceManagementDataHolder.getInstance().getOperationManager() - .updateOperation(activityStatus.getDeviceIdentifier(), operation); - } + } else { + deviceTypes = operationTimeoutConfig.getDeviceTypes(); + } + List activities = DeviceManagementDataHolder.getInstance().getOperationManager() + .getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis, + operationTimeoutConfig.getInitialStatus()); + for (Activity activity : activities) { + for (ActivityStatus activityStatus : activity.getActivityStatus()) { + String operationId = activity.getActivityId().replace("ACTIVITY_", ""); + Operation operation = DeviceManagementDataHolder.getInstance().getOperationManager() + .getOperation(Integer.parseInt(operationId)); + operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus())); + DeviceManagementDataHolder.getInstance().getOperationManager() + .updateOperation(activityStatus.getDeviceIdentifier(), operation); } - - } catch (OperationManagementException e) { - String msg = "Error occurred while retrieving operations."; - log.error(msg, e); } + + } catch (OperationManagementException e) { + String msg = "Error occurred while retrieving operations."; + log.error(msg, e); } } - private boolean isQualifiedToExecuteTask() { - if (isDynamicTaskEligible()) { - try { - return DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask(); - } catch (HeartBeatManagementException e) { - log.error("Error while checking is qualified to execute task", e); - } - } else { - return true; - } - return false; - } } diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/ArchivalTask.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/ArchivalTask.java index bd1b614ca6..f4048ac905 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/ArchivalTask.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/ArchivalTask.java @@ -23,16 +23,16 @@ import org.apache.commons.logging.LogFactory; import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalException; import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalService; import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalServiceImpl; -import org.wso2.carbon.ntask.core.Task; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; import java.util.concurrent.TimeUnit; -public class ArchivalTask implements Task { +public class ArchivalTask extends RandomlyAssignedScheduleTask { private static final Log log = LogFactory.getLog(ArchivalTask.class); + private static final String TASK_NAME = "DATA_ARCHIVAL_TASK"; private ArchivalService archivalService; @@ -42,12 +42,12 @@ public class ArchivalTask implements Task { } @Override - public void init() { + protected void setup() { this.archivalService = new ArchivalServiceImpl(); } @Override - public void execute() { + protected void executeRandomlyAssignedTask() { log.info("Executing ArchivalTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date())); long startTime = System.currentTimeMillis(); try { @@ -60,6 +60,11 @@ public class ArchivalTask implements Task { log.info("ArchivalTask completed. Total execution time: " + getDurationBreakdown(difference)); } + @Override + public String getTaskName() { + return TASK_NAME; + } + private String getDurationBreakdown(long millis) { if (millis < 0) { throw new IllegalArgumentException("Duration must be greater than zero!"); @@ -74,4 +79,5 @@ public class ArchivalTask implements Task { return (days + " Days " + hours + " Hours " + minutes + " Minutes " + seconds + " Seconds"); } + } diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/ArchivedDataDeletionTask.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/ArchivedDataDeletionTask.java index 5282467a91..ce507ed52b 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/ArchivedDataDeletionTask.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/ArchivedDataDeletionTask.java @@ -29,9 +29,10 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; -public class ArchivedDataDeletionTask implements Task { +public class ArchivedDataDeletionTask extends RandomlyAssignedScheduleTask { - private static Log log = LogFactory.getLog(ArchivedDataDeletionTask.class); + private static final Log log = LogFactory.getLog(ArchivedDataDeletionTask.class); + private static final String TASK_NAME = "ARCHIVED_DATA_CLEANUP_TASK"; private ArchivalService archivalService; @@ -41,12 +42,12 @@ public class ArchivedDataDeletionTask implements Task { } @Override - public void init() { + public void setup() { this.archivalService = new ArchivalServiceImpl(); } @Override - public void execute() { + protected void executeRandomlyAssignedTask() { log.info("Executing DataDeletionTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date())); long startTime = System.nanoTime(); try { @@ -58,4 +59,10 @@ public class ArchivedDataDeletionTask implements Task { long difference = (endTime - startTime) / (1000000 * 1000); log.info("DataDeletionTask completed. Total execution time: " + difference + " seconds"); } + + @Override + public String getTaskName() { + return TASK_NAME; + } + } diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java index e267bf4353..9ddeea4e7a 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java @@ -109,4 +109,5 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask { protected void setup() { } + } diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java index 9a5338bf35..6ca728cd13 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java @@ -53,7 +53,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task { public final void init() { try { boolean dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled(); - if(dynamicTaskEnabled){ + if (dynamicTaskEnabled) { taskContext = new DynamicTaskContext(); taskContext.setPartitioningEnabled(true); } else { @@ -75,9 +75,9 @@ public abstract class DynamicPartitionedScheduleTask implements Task { String localHashIndex = getProperty(TaskMgtConstants.Task.LOCAL_HASH_INDEX); // These tasks are not dynamically scheduled. They are added via a config so scheduled in each node // during the server startup - if (localHashIndex == null ) { + if (localHashIndex == null) { if (log.isDebugEnabled()) { - log.debug("Executing startup scheduled task (" + getTaskName() + ") with class: " + + log.debug("Executing startup scheduled task (" + getTaskName() + ") with class: " + this.getClass().getName()); } executeDynamicTask(); @@ -116,7 +116,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task { private void updateContext() throws HeartBeatManagementException { ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); - if(ctxInfo != null) { + if (ctxInfo != null) { populateContext(ctxInfo); } else { log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode."); @@ -127,10 +127,10 @@ public abstract class DynamicPartitionedScheduleTask implements Task { taskContext.setActiveServerCount(ctxInfo.getActiveServerCount()); taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx()); - if(log.isDebugEnabled()){ + if (log.isDebugEnabled()) { log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() + - " where active server count is : " + taskContext.getActiveServerCount() + - " partitioning task enabled : " + taskContext.isPartitioningEnabled()); + " where active server count is : " + taskContext.getActiveServerCount() + + " partitioning task enabled : " + taskContext.isPartitioningEnabled()); } } @@ -142,7 +142,8 @@ public abstract class DynamicPartitionedScheduleTask implements Task { return taskContext; } - public static boolean isDynamicTaskEligible(){ + @Deprecated + public static boolean isDynamicTaskEligible() { return taskContext != null && taskContext.isPartitioningEnabled(); } diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/RandomlyAssignedScheduleTask.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/RandomlyAssignedScheduleTask.java index 959b2965e1..a2d8b1ad7c 100644 --- a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/RandomlyAssignedScheduleTask.java +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.core/src/main/java/io/entgra/device/mgt/core/device/mgt/core/task/impl/RandomlyAssignedScheduleTask.java @@ -38,7 +38,7 @@ public abstract class RandomlyAssignedScheduleTask implements Task { try { dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled(); } catch (HeartBeatManagementException e) { - log.error("Error Instantiating Variables necessary for Randomly Assigned Task Scheduling." , e); + log.error("Error Instantiating Variables necessary for Randomly Assigned Task Scheduling.", e); } //This is done so that sub class extending this abstract class is forced to specify a task name. taskName = getTaskName(); @@ -48,18 +48,20 @@ public abstract class RandomlyAssignedScheduleTask implements Task { @Override public final void execute() { refreshContext(); - executeRandomlyAssignedTask(); + if (isQualifiedToExecuteTask()) { + executeRandomlyAssignedTask(); + } } - public void refreshContext(){ - if(dynamicTaskEnabled) { + public void refreshContext() { + if (dynamicTaskEnabled) { try { qualifiedToExecuteTask = DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask(); log.info("## NODE Qualified to execute Randomly Assigned Task : " + taskName); DeviceManagementDataHolder.getInstance().getHeartBeatService().updateTaskExecutionAcknowledgement(taskName); } catch (HeartBeatManagementException e) { log.error("Error refreshing Variables necessary for Randomly Assigned Scheduled Task. " + - "Dynamic Tasks will not function.", e); + "Dynamic Tasks will not function.", e); } } else { qualifiedToExecuteTask = true; @@ -75,4 +77,5 @@ public abstract class RandomlyAssignedScheduleTask implements Task { } public abstract String getTaskName(); + } diff --git a/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/main/java/io/entgra/device/mgt/core/policy/mgt/core/enforcement/DelegationTask.java b/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/main/java/io/entgra/device/mgt/core/policy/mgt/core/enforcement/DelegationTask.java index c1bdd4fba7..dfa5ae4f5e 100644 --- a/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/main/java/io/entgra/device/mgt/core/policy/mgt/core/enforcement/DelegationTask.java +++ b/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/main/java/io/entgra/device/mgt/core/policy/mgt/core/enforcement/DelegationTask.java @@ -64,7 +64,7 @@ public class DelegationTask extends DynamicPartitionedScheduleTask { try { devices = new ArrayList<>(); toBeNotified = new ArrayList<>(); - if (isDynamicTaskEligible()) { + if(getTaskContext() != null && getTaskContext().isPartitioningEnabled()){ devices.addAll(service.getAllocatedDevices(deviceType, getTaskContext().getActiveServerCount(), getTaskContext().getServerHashIndex())); diff --git a/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/main/java/io/entgra/device/mgt/core/policy/mgt/core/task/MonitoringTask.java b/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/main/java/io/entgra/device/mgt/core/policy/mgt/core/task/MonitoringTask.java index 8836b00892..42b6e745ab 100644 --- a/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/main/java/io/entgra/device/mgt/core/policy/mgt/core/task/MonitoringTask.java +++ b/components/policy-mgt/io.entgra.device.mgt.core.policy.mgt.core/src/main/java/io/entgra/device/mgt/core/policy/mgt/core/task/MonitoringTask.java @@ -105,7 +105,7 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask { PolicyManagementDataHolder.getInstance().getDeviceManagementService() .getPolicyMonitoringManager(deviceType); List devices; - if(isDynamicTaskEligible()){ + if (getTaskContext() != null && getTaskContext().isPartitioningEnabled()) { devices = deviceManagementProviderService .getAllocatedDevices(deviceType, getTaskContext().getActiveServerCount(), getTaskContext().getServerHashIndex()); diff --git a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.common/src/main/java/io/entgra/device/mgt/core/task/mgt/common/constant/TaskMgtConstants.java b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.common/src/main/java/io/entgra/device/mgt/core/task/mgt/common/constant/TaskMgtConstants.java index e5458b6a06..0a6e33379e 100755 --- a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.common/src/main/java/io/entgra/device/mgt/core/task/mgt/common/constant/TaskMgtConstants.java +++ b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.common/src/main/java/io/entgra/device/mgt/core/task/mgt/common/constant/TaskMgtConstants.java @@ -49,5 +49,6 @@ public class TaskMgtConstants { public static final String TENANT_ID_PROP = "__TENANT_ID_PROP__"; public static final String LOCAL_HASH_INDEX = "__LOCAL_HASH_INDEX__"; public static final String LOCAL_TASK_NAME = "__LOCAL_TASK_NAME__"; + public static final String DYNAMIC_TASK_ID = "__DYNAMIC_TASK_ID__"; } } diff --git a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.common/src/main/java/io/entgra/device/mgt/core/task/mgt/common/spi/TaskManagementService.java b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.common/src/main/java/io/entgra/device/mgt/core/task/mgt/common/spi/TaskManagementService.java index f07cb49083..ab8a02199f 100755 --- a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.common/src/main/java/io/entgra/device/mgt/core/task/mgt/common/spi/TaskManagementService.java +++ b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.common/src/main/java/io/entgra/device/mgt/core/task/mgt/common/spi/TaskManagementService.java @@ -22,6 +22,7 @@ import io.entgra.device.mgt.core.task.mgt.common.exception.TaskNotFoundException import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementException; import java.util.List; +import java.util.Map; public interface TaskManagementService { @@ -37,7 +38,9 @@ public interface TaskManagementService { List getAllDynamicTasks() throws TaskManagementException; - DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementException; + Map> getDynamicTasksForAllTenants() throws TaskManagementException; + + DynamicTask getDynamicTask(int dynamicTaskId) throws TaskManagementException; List getActiveDynamicTasks() throws TaskManagementException; } diff --git a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/DynamicTaskDAO.java b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/DynamicTaskDAO.java index 759de555b4..7493e9a025 100755 --- a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/DynamicTaskDAO.java +++ b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/DynamicTaskDAO.java @@ -27,16 +27,18 @@ import java.util.List; */ public interface DynamicTaskDAO { - int addTask(DynamicTask dynamicTask) throws TaskManagementDAOException; + int addTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException; - boolean updateDynamicTask(DynamicTask dynamicTask) throws TaskManagementDAOException; + boolean updateDynamicTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException; - void deleteDynamicTask(int dynamicTaskId) throws TaskManagementDAOException; + void deleteDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException; - DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementDAOException; + DynamicTask getDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException; List getAllDynamicTasks() throws TaskManagementDAOException; - List getActiveDynamicTasks() throws TaskManagementDAOException; + List getAllDynamicTasks(int tenantId) throws TaskManagementDAOException; + + List getActiveDynamicTasks(int tenantId) throws TaskManagementDAOException; } diff --git a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/DynamicTaskPropDAO.java b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/DynamicTaskPropDAO.java index 7a448da0fa..1ef3fd5a91 100755 --- a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/DynamicTaskPropDAO.java +++ b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/DynamicTaskPropDAO.java @@ -26,10 +26,11 @@ import java.util.Map; */ public interface DynamicTaskPropDAO { - void addTaskProperties(int dynamicTaskId, Map properties) throws TaskManagementDAOException; + void addTaskProperties(int dynamicTaskId, Map properties, int tenantId) + throws TaskManagementDAOException; - Map getDynamicTaskProps(int dynamicTaskId) throws TaskManagementDAOException; + Map getDynamicTaskProps(int dynamicTaskId, int tenantId) throws TaskManagementDAOException; - void updateDynamicTaskProps(int dynamicTaskId, Map properties) + void updateDynamicTaskProps(int dynamicTaskId, Map properties, int tenantId) throws TaskManagementDAOException; } diff --git a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/common/TaskManagementDAOFactory.java b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/common/TaskManagementDAOFactory.java index 7c0aa2ff0b..e70e41689b 100755 --- a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/common/TaskManagementDAOFactory.java +++ b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/common/TaskManagementDAOFactory.java @@ -103,18 +103,22 @@ public class TaskManagementDAOFactory { conn.setAutoCommit(false); currentConnection.set(conn); } catch (SQLException e) { - throw new TransactionManagementException("Error occurred while retrieving config.datasource connection", e); + throw new TransactionManagementException("Error occurred while retrieving datasource connection", e); } } - public static void openConnection() throws SQLException { + public static void openConnection() throws TransactionManagementException { Connection conn = currentConnection.get(); if (conn != null) { throw new IllegalTransactionStateException("A transaction is already active within the context of " + "this particular thread. Therefore, calling 'beginTransaction/openConnection' while another " + "transaction is already active is a sign of improper transaction handling"); } - conn = dataSource.getConnection(); + try { + conn = dataSource.getConnection(); + } catch (SQLException e) { + throw new TransactionManagementException("Error occurred while retrieving datasource connection", e); + } currentConnection.set(conn); } diff --git a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/impl/DynamicTaskDAOImpl.java b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/impl/DynamicTaskDAOImpl.java index f115176717..843b5c9189 100755 --- a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/impl/DynamicTaskDAOImpl.java +++ b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/impl/DynamicTaskDAOImpl.java @@ -17,16 +17,19 @@ */ package io.entgra.device.mgt.core.task.mgt.core.dao.impl; -import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory; -import io.entgra.device.mgt.core.task.mgt.core.dao.util.TaskManagementDAOUtil; import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask; import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementDAOException; import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskDAO; +import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory; +import io.entgra.device.mgt.core.task.mgt.core.dao.util.TaskManagementDAOUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.wso2.carbon.context.PrivilegedCarbonContext; -import java.sql.*; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.List; @@ -34,9 +37,9 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO { private static final Log log = LogFactory.getLog(DynamicTaskDAOImpl.class); @Override - public int addTask(DynamicTask dynamicTask) throws TaskManagementDAOException { + public int addTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException { PreparedStatement stmt = null; - ResultSet rs = null; + ResultSet rs; int taskId = -1; try { Connection conn = TaskManagementDAOFactory.getConnection(); @@ -48,13 +51,14 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO { stmt.setString(2, dynamicTask.getName()); stmt.setBoolean(3, dynamicTask.isEnabled()); stmt.setString(4, dynamicTask.getTaskClassName()); - stmt.setInt(5, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()); + stmt.setInt(5, tenantId); stmt.executeUpdate(); rs = stmt.getGeneratedKeys(); if (rs.next()) { taskId = rs.getInt(1); } + dynamicTask.setDynamicTaskId(taskId); return taskId; } catch (SQLException e) { String msg = "Error occurred while inserting task '" + dynamicTask.getName() + "'"; @@ -66,16 +70,17 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO { } @Override - public boolean updateDynamicTask(DynamicTask dynamicTask) throws TaskManagementDAOException { + public boolean updateDynamicTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException { PreparedStatement stmt = null; int rows; try { Connection conn = TaskManagementDAOFactory.getConnection(); - String sql = "UPDATE DYNAMIC_TASK SET CRON = ?,IS_ENABLED = ? WHERE DYNAMIC_TASK_ID = ?"; + String sql = "UPDATE DYNAMIC_TASK SET CRON = ?,IS_ENABLED = ? WHERE DYNAMIC_TASK_ID = ? AND TENANT_ID = ?"; stmt = conn.prepareStatement(sql); stmt.setString(1, dynamicTask.getCronExpression()); stmt.setBoolean(2, dynamicTask.isEnabled()); stmt.setInt(3, dynamicTask.getDynamicTaskId()); + stmt.setInt(4, tenantId); rows = stmt.executeUpdate(); return (rows > 0); } catch (SQLException e) { @@ -87,9 +92,8 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO { } } - @Override - public void deleteDynamicTask(int dynamicTaskId) throws TaskManagementDAOException { + public void deleteDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException { if (log.isDebugEnabled()) { log.debug("Request received in DAO Layer to delete dynamic task with the id: " + dynamicTaskId); } @@ -98,7 +102,7 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO { Connection conn = TaskManagementDAOFactory.getConnection(); try (PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { stmt.setInt(1, dynamicTaskId); - stmt.setInt(2, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()); + stmt.setInt(2, tenantId); stmt.executeUpdate(); } } catch (SQLException e) { @@ -110,7 +114,7 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO { } @Override - public DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementDAOException { + public DynamicTask getDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException { DynamicTask dynamicTask = null; try { Connection conn = TaskManagementDAOFactory.getConnection(); @@ -118,7 +122,7 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO { try (PreparedStatement stmt = conn.prepareStatement(sql)) { stmt.setInt(1, dynamicTaskId); - stmt.setInt(2, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()); + stmt.setInt(2, tenantId); try (ResultSet rs = stmt.executeQuery()) { if (rs.next()) { dynamicTask = TaskManagementDAOUtil.loadDynamicTask(rs); @@ -155,13 +159,35 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO { } @Override - public List getActiveDynamicTasks() throws TaskManagementDAOException { - List dynamicTasks = null; + public List getAllDynamicTasks(int tenantId) throws TaskManagementDAOException { + List dynamicTasks; + try { + Connection conn = TaskManagementDAOFactory.getConnection(); + String sql = "SELECT * FROM DYNAMIC_TASK WHERE TENANT_ID = ?"; + + try (PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setInt(1, tenantId); + try (ResultSet rs = stmt.executeQuery()) { + dynamicTasks = TaskManagementDAOUtil.loadDynamicTasks(rs); + } + } + } catch (SQLException e) { + String msg = "Error occurred while getting all dynamic task data "; + log.error(msg, e); + throw new TaskManagementDAOException(msg, e); + } + return dynamicTasks; + } + + @Override + public List getActiveDynamicTasks(int tenantId) throws TaskManagementDAOException { + List dynamicTasks; try { Connection conn = TaskManagementDAOFactory.getConnection(); - String sql = "SELECT * FROM DYNAMIC_TASK WHERE IS_ENABLED = 'true' "; + String sql = "SELECT * FROM DYNAMIC_TASK WHERE IS_ENABLED = 'true' AND TENANT_ID = ?"; try (PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setInt(1, tenantId); try (ResultSet rs = stmt.executeQuery()) { dynamicTasks = TaskManagementDAOUtil.loadDynamicTasks(rs); } @@ -173,4 +199,5 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO { } return dynamicTasks; } + } diff --git a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/impl/DynamicTaskPropDAOImpl.java b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/impl/DynamicTaskPropDAOImpl.java index 10d502f322..5979832130 100755 --- a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/impl/DynamicTaskPropDAOImpl.java +++ b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/dao/impl/DynamicTaskPropDAOImpl.java @@ -38,9 +38,9 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO { private static final Log log = LogFactory.getLog(DynamicTaskPropDAOImpl.class); @Override - public void addTaskProperties(int taskId, Map properties) + public void addTaskProperties(int taskId, Map properties, int tenantId) throws TaskManagementDAOException { - Connection conn = null; + Connection conn; PreparedStatement stmt = null; try { conn = TaskManagementDAOFactory.getConnection(); @@ -51,7 +51,7 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO { stmt.setInt(1, taskId); stmt.setString(2, propertyKey); stmt.setString(3, properties.get(propertyKey)); - stmt.setInt(4, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()); + stmt.setInt(4, tenantId); stmt.addBatch(); } stmt.executeBatch(); @@ -64,17 +64,17 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO { } } - - public Map getDynamicTaskProps(int dynamicTaskId) throws TaskManagementDAOException { - Connection conn = null; + public Map getDynamicTaskProps(int dynamicTaskId, int tenantId) throws TaskManagementDAOException { + Connection conn; PreparedStatement stmt = null; ResultSet resultSet = null; Map properties; try { conn = TaskManagementDAOFactory.getConnection(); stmt = conn.prepareStatement( - "SELECT * FROM DYNAMIC_TASK_PROPERTIES WHERE DYNAMIC_TASK_ID = ?"); + "SELECT * FROM DYNAMIC_TASK_PROPERTIES WHERE DYNAMIC_TASK_ID = ? AND TENANT_ID = ?"); stmt.setInt(1, dynamicTaskId); + stmt.setInt(2, tenantId); resultSet = stmt.executeQuery(); properties = new HashMap<>(); while (resultSet.next()) { @@ -92,7 +92,7 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO { } @Override - public void updateDynamicTaskProps(int dynamicTaskId, Map properties) + public void updateDynamicTaskProps(int dynamicTaskId, Map properties, int tenantId) throws TaskManagementDAOException { if (properties.isEmpty()) { if (log.isDebugEnabled()) { @@ -105,12 +105,13 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO { try { conn = TaskManagementDAOFactory.getConnection(); stmt = conn.prepareStatement("UPDATE DYNAMIC_TASK_PROPERTIES SET PROPERTY_VALUE = ? " + - "WHERE DYNAMIC_TASK_ID = ? AND PROPERTY_NAME = ?"); + "WHERE DYNAMIC_TASK_ID = ? AND PROPERTY_NAME = ? AND TENANT_ID = ?"); for (Map.Entry entry : properties.entrySet()) { stmt.setString(1, entry.getValue()); stmt.setInt(2, dynamicTaskId); stmt.setString(3, entry.getKey()); + stmt.setInt(4, tenantId); stmt.addBatch(); } stmt.executeBatch(); @@ -121,4 +122,5 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO { TaskManagementDAOUtil.cleanupResources(stmt, null); } } + } diff --git a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/service/TaskManagementServiceImpl.java b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/service/TaskManagementServiceImpl.java index 4a8252a36b..53d19105df 100755 --- a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/service/TaskManagementServiceImpl.java +++ b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/service/TaskManagementServiceImpl.java @@ -17,10 +17,6 @@ */ package io.entgra.device.mgt.core.task.mgt.core.service; -import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskPropDAO; -import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory; -import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder; -import io.entgra.device.mgt.core.task.mgt.core.util.TaskManagementUtil; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask; import io.entgra.device.mgt.core.task.mgt.common.constant.TaskMgtConstants; @@ -30,14 +26,21 @@ import io.entgra.device.mgt.core.task.mgt.common.exception.TaskNotFoundException import io.entgra.device.mgt.core.task.mgt.common.exception.TransactionManagementException; import io.entgra.device.mgt.core.task.mgt.common.spi.TaskManagementService; import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskDAO; +import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskPropDAO; +import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory; +import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder; +import io.entgra.device.mgt.core.task.mgt.core.util.TaskManagementUtil; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.context.PrivilegedCarbonContext; import org.wso2.carbon.ntask.common.TaskException; import org.wso2.carbon.ntask.core.TaskInfo; import org.wso2.carbon.ntask.core.TaskManager; import org.wso2.carbon.ntask.core.service.TaskService; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,42 +81,43 @@ public class TaskManagementServiceImpl implements TaskManagementService { @Override public void createTask(DynamicTask dynamicTask) throws TaskManagementException { - String taskId; + String nTaskName; + int dynamicTaskId; + int serverHashIdx; + int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); try { // add into the dynamic task tables TaskManagementDAOFactory.beginTransaction(); - int dynamicTaskId = dynamicTaskDAO.addTask(dynamicTask); - - Map taskProperties = dynamicTask.getProperties(); - dynamicTaskPropDAO.addTaskProperties(dynamicTaskId, taskProperties); - - // add into the ntask core - taskId = TaskManagementUtil.generateTaskId(dynamicTaskId); + dynamicTaskId = dynamicTaskDAO.addTask(dynamicTask, tenantId); + dynamicTaskPropDAO.addTaskProperties(dynamicTaskId, dynamicTask.getProperties(), tenantId); try { - int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() + serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() .getServerCtxInfo().getLocalServerHashIdx(); - taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); - taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, taskId); + nTaskName = TaskManagementUtil.generateNTaskName(dynamicTaskId, serverHashIdx); } catch (HeartBeatManagementException e) { String msg = "Unexpected exception when getting server hash index."; log.error(msg, e); throw new TaskManagementException(msg, e); } - if (!isTaskExists(taskId)) { - TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); - triggerInfo.setCronExpression(dynamicTask.getCronExpression()); - TaskInfo taskInfo = new TaskInfo(taskId, dynamicTask.getTaskClassName(), taskProperties, triggerInfo); - taskManager.registerTask(taskInfo); - taskManager.scheduleTask(taskId); - if (!dynamicTask.isEnabled()) { - taskManager.pauseTask(taskId); - } - } else { - String msg = "Task '" + taskId + "' is already exists in the ntask core " - + "Hence not creating another task for the same name."; - log.error(msg); + if (isTaskExists(nTaskName)) { + String msg = "Task '" + nTaskName + "' is already exists in the ntask core. " + + "Hence removing existing task from nTask before adding new one."; + log.warn(msg); + taskManager.deleteTask(nTaskName); + } + + // add into the ntask core + Map taskProperties = TaskManagementUtil + .populateNTaskProperties(dynamicTask, nTaskName, serverHashIdx); + TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); + triggerInfo.setCronExpression(dynamicTask.getCronExpression()); + TaskInfo taskInfo = new TaskInfo(nTaskName, dynamicTask.getTaskClassName(), taskProperties, triggerInfo); + taskManager.registerTask(taskInfo); + taskManager.scheduleTask(nTaskName); + if (!dynamicTask.isEnabled()) { + taskManager.pauseTask(nTaskName); } TaskManagementDAOFactory.commitTransaction(); @@ -137,19 +141,20 @@ public class TaskManagementServiceImpl implements TaskManagementService { } @Override - public void updateTask(int dynamicTaskId, DynamicTask dynamicTask) throws TaskManagementException - , TaskNotFoundException { + public void updateTask(int dynamicTaskId, DynamicTask dynamicTask) + throws TaskManagementException, TaskNotFoundException { + int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); try { //Update dynamic task table TaskManagementDAOFactory.beginTransaction(); - DynamicTask existingTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId); + DynamicTask existingTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId); if (existingTask != null) { existingTask.setEnabled(dynamicTask.isEnabled()); existingTask.setCronExpression(dynamicTask.getCronExpression()); - dynamicTaskDAO.updateDynamicTask(existingTask); + dynamicTaskDAO.updateDynamicTask(existingTask, tenantId); if (!dynamicTask.getProperties().isEmpty()) { - dynamicTaskPropDAO.updateDynamicTaskProps(dynamicTaskId, dynamicTask.getProperties()); + dynamicTaskPropDAO.updateDynamicTaskProps(dynamicTaskId, dynamicTask.getProperties(), tenantId); } } else { String msg = "Task '" + dynamicTaskId + "' is not exists in the dynamic task table."; @@ -158,12 +163,14 @@ public class TaskManagementServiceImpl implements TaskManagementService { } // Update task in the ntask core - String taskId = TaskManagementUtil.generateTaskId(existingTask.getDynamicTaskId()); - if (isTaskExists(taskId)) { - TaskInfo taskInfo = taskManager.getTask(taskId); - if (!dynamicTask.getProperties().isEmpty()) { - taskInfo.setProperties(dynamicTask.getProperties()); - } + String nTaskName = TaskManagementUtil.generateNTaskName(existingTask.getDynamicTaskId()); + if (isTaskExists(nTaskName)) { + TaskInfo taskInfo = taskManager.getTask(nTaskName); + + Map taskProperties = TaskManagementUtil + .populateNTaskProperties(dynamicTask, nTaskName); + taskInfo.setProperties(taskProperties); + TaskInfo.TriggerInfo triggerInfo; if (taskInfo.getTriggerInfo() == null) { triggerInfo = new TaskInfo.TriggerInfo(); @@ -173,9 +180,9 @@ public class TaskManagementServiceImpl implements TaskManagementService { triggerInfo.setCronExpression(dynamicTask.getCronExpression()); taskInfo.setTriggerInfo(triggerInfo); taskManager.registerTask(taskInfo); - taskManager.rescheduleTask(taskId); + taskManager.rescheduleTask(nTaskName); } else { - String msg = "Task '" + taskId + "' is not exists in the n task core " + String msg = "Task '" + nTaskName + "' is not exists in the n task core " + "Hence cannot update the task."; log.error(msg); } @@ -200,16 +207,17 @@ public class TaskManagementServiceImpl implements TaskManagementService { } @Override - public void toggleTask(int dynamicTaskId, boolean isEnabled) throws TaskManagementException - , TaskNotFoundException { + public void toggleTask(int dynamicTaskId, boolean isEnabled) + throws TaskManagementException, TaskNotFoundException { + int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); try { //update dynamic task table TaskManagementDAOFactory.beginTransaction(); - DynamicTask existingTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId); + DynamicTask existingTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId); if (existingTask != null) { existingTask.setEnabled(isEnabled); - dynamicTaskDAO.updateDynamicTask(existingTask); + dynamicTaskDAO.updateDynamicTask(existingTask, tenantId); } else { String msg = "Task '" + dynamicTaskId + "' is not exists."; log.error(msg); @@ -217,15 +225,15 @@ public class TaskManagementServiceImpl implements TaskManagementService { } // Update task in the ntask core - String taskId = TaskManagementUtil.generateTaskId(existingTask.getDynamicTaskId()); - if (isTaskExists(taskId)) { + String taskName = TaskManagementUtil.generateNTaskName(existingTask.getDynamicTaskId()); + if (isTaskExists(taskName)) { if (isEnabled) { - taskManager.resumeTask(taskId); + taskManager.resumeTask(taskName); } else { - taskManager.pauseTask(taskId); + taskManager.pauseTask(taskName); } } else { - String msg = "Task '" + taskId + "' is not exists in the ntask core " + String msg = "Task '" + taskName + "' is not exists in the ntask core " + "Hence cannot toggle the task in the ntask."; log.error(msg); } @@ -251,22 +259,23 @@ public class TaskManagementServiceImpl implements TaskManagementService { @Override public void deleteTask(int dynamicTaskId) throws TaskManagementException, TaskNotFoundException { // delete task from dynamic task table + int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); try { TaskManagementDAOFactory.beginTransaction(); - DynamicTask existingTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId); + DynamicTask existingTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId); if (existingTask != null) { - dynamicTaskDAO.deleteDynamicTask(dynamicTaskId); + dynamicTaskDAO.deleteDynamicTask(dynamicTaskId, tenantId); } else { String msg = "Task '" + dynamicTaskId + "' is not exists."; log.error(msg); throw new TaskNotFoundException(msg); } - String taskId = TaskManagementUtil.generateTaskId(existingTask.getDynamicTaskId()); - if (isTaskExists(taskId)) { - taskManager.deleteTask(taskId); + String taskName = TaskManagementUtil.generateNTaskName(existingTask.getDynamicTaskId()); + if (isTaskExists(taskName)) { + taskManager.deleteTask(taskName); } else { - String msg = "Task '" + taskId + "' is not exists in the ntask core " + String msg = "Task '" + taskName + "' is not exists in the ntask core " + "Hence cannot delete from the ntask core."; log.error(msg); } @@ -292,22 +301,21 @@ public class TaskManagementServiceImpl implements TaskManagementService { @Override public List getAllDynamicTasks() throws TaskManagementException { + int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); List dynamicTasks; try { - if (log.isDebugEnabled()) { - log.debug("Fetching the details of all dynamic tasks"); + if (log.isTraceEnabled()) { + log.trace("Fetching the details of all dynamic tasks"); } - TaskManagementDAOFactory.beginTransaction(); - dynamicTasks = dynamicTaskDAO.getAllDynamicTasks(); + TaskManagementDAOFactory.openConnection(); + dynamicTasks = dynamicTaskDAO.getAllDynamicTasks(tenantId); if (dynamicTasks != null) { for (DynamicTask dynamicTask : dynamicTasks) { dynamicTask.setProperties(dynamicTaskPropDAO - .getDynamicTaskProps(dynamicTask.getDynamicTaskId())); + .getDynamicTaskProps(dynamicTask.getDynamicTaskId(), tenantId)); } } - TaskManagementDAOFactory.commitTransaction(); } catch (TaskManagementDAOException e) { - TaskManagementDAOFactory.rollbackTransaction(); String msg = "Error occurred while fetching all dynamic tasks"; log.error(msg, e); throw new TaskManagementException(msg, e); @@ -322,20 +330,63 @@ public class TaskManagementServiceImpl implements TaskManagementService { } @Override - public DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementException { + public Map> getDynamicTasksForAllTenants() throws TaskManagementException { + List dynamicTasks; + try { + if (log.isTraceEnabled()) { + log.trace("Fetching the details of dynamic tasks for all tenants"); + } + TaskManagementDAOFactory.openConnection(); + dynamicTasks = dynamicTaskDAO.getAllDynamicTasks(); + if (dynamicTasks != null) { + for (DynamicTask dynamicTask : dynamicTasks) { + dynamicTask.setProperties(dynamicTaskPropDAO + .getDynamicTaskProps(dynamicTask.getDynamicTaskId(), dynamicTask.getTenantId())); + } + } + } catch (TaskManagementDAOException e) { + String msg = "Error occurred while fetching all dynamic tasks"; + log.error(msg, e); + throw new TaskManagementException(msg, e); + } catch (TransactionManagementException e) { + String msg = "Failed to start/open transaction to get all dynamic tasks"; + log.error(msg, e); + throw new TaskManagementException(msg, e); + } finally { + TaskManagementDAOFactory.closeConnection(); + } + Map> tenantedDynamicTasks = new HashMap<>(); + List dts; + if (dynamicTasks != null) { + for (DynamicTask dt : dynamicTasks) { + if (tenantedDynamicTasks.containsKey(dt.getTenantId())) { + dts = tenantedDynamicTasks.get(dt.getTenantId()); + } else { + dts = new ArrayList<>(); + } + dts.add(dt); + tenantedDynamicTasks.put(dt.getTenantId(), dts); + } + } + return tenantedDynamicTasks; + } + + @Override + public DynamicTask getDynamicTask(int dynamicTaskId) throws TaskManagementException { + int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); DynamicTask dynamicTask; try { if (log.isDebugEnabled()) { log.debug("Fetching the details of dynamic task '" + dynamicTaskId + "'"); } - TaskManagementDAOFactory.beginTransaction(); - dynamicTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId); + TaskManagementDAOFactory.openConnection(); + dynamicTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId); if (dynamicTask != null) { - dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId())); + dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId(), + tenantId)); } TaskManagementDAOFactory.commitTransaction(); } catch (TaskManagementDAOException e) { - TaskManagementDAOFactory.rollbackTransaction(); String msg = "Error occurred while fetching dynamic task '" + dynamicTaskId + "'"; log.error(msg, e); throw new TaskManagementException(msg, e); @@ -351,21 +402,21 @@ public class TaskManagementServiceImpl implements TaskManagementService { @Override public List getActiveDynamicTasks() throws TaskManagementException { + int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId(); List dynamicTasks; try { if (log.isDebugEnabled()) { log.debug("Fetching the details of all active dynamic tasks"); } - TaskManagementDAOFactory.beginTransaction(); - dynamicTasks = dynamicTaskDAO.getActiveDynamicTasks(); + TaskManagementDAOFactory.openConnection(); + dynamicTasks = dynamicTaskDAO.getActiveDynamicTasks(tenantId); if (dynamicTasks != null) { for (DynamicTask dynamicTask : dynamicTasks) { - dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId())); + dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId(), + tenantId)); } } - TaskManagementDAOFactory.commitTransaction(); } catch (TaskManagementDAOException e) { - TaskManagementDAOFactory.rollbackTransaction(); String msg = "Error occurred while fetching all active dynamic tasks"; log.error(msg, e); throw new TaskManagementException(msg, e); @@ -380,18 +431,19 @@ public class TaskManagementServiceImpl implements TaskManagementService { } // check whether task exist in the ntask core - private boolean isTaskExists(String taskId) throws TaskManagementException, TaskException { - if (StringUtils.isEmpty(taskId)) { - String msg = "Task ID must not be null or empty."; + private boolean isTaskExists(String taskName) throws TaskManagementException, TaskException { + if (StringUtils.isEmpty(taskName)) { + String msg = "Task Name must not be null or empty."; log.error(msg); throw new TaskManagementException(msg); } List tasks = taskManager.getAllTasks(); for (TaskInfo t : tasks) { - if (taskId.equals(t.getName())) { + if (taskName.equals(t.getName())) { return true; } } return false; } + } diff --git a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/util/TaskManagementUtil.java b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/util/TaskManagementUtil.java index c060451a6b..2a284f3021 100755 --- a/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/util/TaskManagementUtil.java +++ b/components/task-mgt/task-manager/io.entgra.device.mgt.core.task.mgt.core/src/main/java/io/entgra/device/mgt/core/task/mgt/core/util/TaskManagementUtil.java @@ -17,20 +17,21 @@ */ package io.entgra.device.mgt.core.task.mgt.core.util; -import com.google.gson.Gson; import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; +import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask; import io.entgra.device.mgt.core.task.mgt.common.constant.TaskMgtConstants; import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementException; import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.w3c.dom.Document; +import org.wso2.carbon.context.PrivilegedCarbonContext; import javax.xml.XMLConstants; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import java.io.File; +import java.util.HashMap; import java.util.Map; /** @@ -55,11 +56,11 @@ public class TaskManagementUtil { } } - public static String generateTaskId(int dynamicTaskId) throws TaskManagementException { + public static String generateNTaskName(int dynamicTaskId) throws TaskManagementException { try { int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() .getServerCtxInfo().getLocalServerHashIdx(); - return generateTaskId(dynamicTaskId, serverHashIdx); + return generateNTaskName(dynamicTaskId, serverHashIdx); } catch (HeartBeatManagementException e) { String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId; log.error(msg, e); @@ -67,18 +68,33 @@ public class TaskManagementUtil { } } - public static String generateTaskId(int dynamicTaskId, int serverHashIdx) { + public static String generateNTaskName(int dynamicTaskId, int serverHashIdx) { return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId + TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx; } - public static String generateTaskPropsMD5(Map taskProperties) { - taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP); - taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX); - taskProperties.remove(TaskMgtConstants.Task.LOCAL_TASK_NAME); - Gson gson = new Gson(); - String json = gson.toJson(taskProperties); - return DigestUtils.md5Hex(json); + public static Map populateNTaskProperties(DynamicTask dynamicTask, + String nTaskName) throws TaskManagementException { + try { + int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() + .getServerCtxInfo().getLocalServerHashIdx(); + return populateNTaskProperties(dynamicTask, nTaskName, serverHashIdx); + } catch (HeartBeatManagementException e) { + String msg = "Failed to populate nTask properties a dynamic task " + dynamicTask.getDynamicTaskId(); + log.error(msg, e); + throw new TaskManagementException(msg, e); + } + } + + public static Map populateNTaskProperties(DynamicTask dynamicTask, + String nTaskName, int serverHashIdx) { + Map taskProperties = new HashMap<>(); + taskProperties.put(TaskMgtConstants.Task.DYNAMIC_TASK_ID, String.valueOf(dynamicTask.getDynamicTaskId())); + taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, nTaskName); + taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); + taskProperties.put(TaskMgtConstants.Task.TENANT_ID_PROP, + String.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId())); + return taskProperties; } } diff --git a/components/task-mgt/task-watcher/io.entgra.device.mgt.core.task.mgt.watcher/src/main/java/io/entgra/device/mgt/core/task/mgt/watcher/IoTSStartupHandler.java b/components/task-mgt/task-watcher/io.entgra.device.mgt.core.task.mgt.watcher/src/main/java/io/entgra/device/mgt/core/task/mgt/watcher/IoTSStartupHandler.java index 8d51a240cc..5b1945e686 100755 --- a/components/task-mgt/task-watcher/io.entgra.device.mgt.core.task.mgt.watcher/src/main/java/io/entgra/device/mgt/core/task/mgt/watcher/IoTSStartupHandler.java +++ b/components/task-mgt/task-watcher/io.entgra.device.mgt.core.task.mgt.watcher/src/main/java/io/entgra/device/mgt/core/task/mgt/watcher/IoTSStartupHandler.java @@ -38,7 +38,6 @@ import org.wso2.carbon.user.core.service.RealmService; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -69,47 +68,38 @@ public class IoTSStartupHandler implements ServerStartupObserver { private void compareTasks() { if (log.isDebugEnabled()) { - log.debug("Comparing Tasks from carbon nTask manager and entgra task manager"); + log.debug("Comparing Tasks from carbon nTask manager and Entgra task manager."); } TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService(); if (nTaskService == null) { - String msg = "Unable to load TaskService from the carbon nTask core"; + String msg = "Unable to load TaskService from the carbon nTask core."; log.error(msg); return; } try { - List dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService() - .getAllDynamicTasks(); + Map> tenantedDynamicTasks = TaskWatcherDataHolder.getInstance() + .getTaskManagementService().getDynamicTasksForAllTenants(); - scheduleMissingTasks(nTaskService, dynamicTasks); - deleteObsoleteTasks(nTaskService, dynamicTasks); + scheduleMissingTasks(nTaskService, tenantedDynamicTasks); + deleteObsoleteTasks(nTaskService, tenantedDynamicTasks); if (log.isDebugEnabled()) { - log.debug("Task Comparison Completed and all tasks in current node are updated"); + log.debug("Task Comparison Completed and all tasks in current node are updated."); } } catch (TaskException e) { String msg = "Error occurred while accessing carbon nTask manager."; log.error(msg, e); } catch (TaskManagementException e) { - String msg = "Error occurred while retrieving all active tasks from entgra task manager"; + String msg = "Error occurred while retrieving all active tasks from Entgra task manager."; log.error(msg, e); } } - private static void scheduleMissingTasks(TaskService nTaskService, List dynamicTasks) + private static void scheduleMissingTasks(TaskService nTaskService, Map> tenantedDynamicTasks) throws TaskException, TaskManagementException { - Map> tenantedDynamicTasks = new HashMap<>(); - List dts; - for (DynamicTask dt : dynamicTasks) { - if (tenantedDynamicTasks.containsKey(dt.getTenantId())) { - dts = tenantedDynamicTasks.get(dt.getTenantId()); - } else { - dts = new ArrayList<>(); - } - dts.add(dt); - tenantedDynamicTasks.put(dt.getTenantId(), dts); - } + TaskManager taskManager; for (Integer tenantId : tenantedDynamicTasks.keySet()) { if (tenantId == -1) { @@ -126,36 +116,56 @@ public class IoTSStartupHandler implements ServerStartupObserver { List tasks = taskManager.getAllTasks(); // add or update task into nTask core for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) { - String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId()); + int serverHashIdx; + try { + serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService() + .getServerCtxInfo().getLocalServerHashIdx(); + } catch (HeartBeatManagementException e) { + String msg = "Failed to get server hash index for dynamic task " + dt.getDynamicTaskId(); + log.error(msg, e); + throw new TaskManagementException(msg, e); + } + + String nTaskName = TaskManagementUtil.generateNTaskName(dt.getDynamicTaskId(), serverHashIdx); boolean isExist = false; for (TaskInfo taskInfo : tasks) { - if (taskInfo.getName().equals(generatedTaskId)) { - isExist = true; + if (taskInfo.getName().equals(nTaskName)) { + TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo(); - String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(dt.getProperties()); - String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties()); - if (!triggerInfo.getCronExpression().equals(dt.getCronExpression()) - || !dynamicTaskPropMD5.equals(existingTaskPropMD5)) { + if (taskInfo.getProperties() == null) { + String msg = "Task properties not found for task " + nTaskName + + ". Therefore deleting the nTask schedule."; + log.warn(msg); + taskManager.deleteTask(nTaskName); + break; + } + + isExist = true; + if (!triggerInfo.getCronExpression().equals(dt.getCronExpression())) { triggerInfo.setCronExpression(dt.getCronExpression()); taskInfo.setTriggerInfo(triggerInfo); - taskInfo.setProperties(populateTaskProperties(tenantId, generatedTaskId, dt.getProperties())); + taskInfo.setProperties(TaskManagementUtil + .populateNTaskProperties(dt, taskInfo.getName(), serverHashIdx)); taskManager.registerTask(taskInfo); - taskManager.rescheduleTask(generatedTaskId); + taskManager.rescheduleTask(nTaskName); if (log.isDebugEnabled()) { - log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table"); + log.debug("Task - '" + nTaskName + + "' updated according to the dynamic task table."); } } if (dt.isEnabled() - && taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) { - taskManager.resumeTask(generatedTaskId); + && taskManager.getTaskState(nTaskName) == TaskManager.TaskState.PAUSED) { + taskManager.resumeTask(nTaskName); if (log.isDebugEnabled()) { - log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table"); + log.debug("Task - '" + nTaskName + + "' enabled according to the dynamic task table."); } } else if (!dt.isEnabled() - && taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) { - taskManager.pauseTask(generatedTaskId); + && taskManager.getTaskState(nTaskName) != TaskManager.TaskState.PAUSED) { + taskManager.pauseTask(nTaskName); if (log.isDebugEnabled()) { - log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table"); + log.debug("Task - '" + nTaskName + + "' disabled according to the dynamic task table."); } } break; @@ -164,12 +174,12 @@ public class IoTSStartupHandler implements ServerStartupObserver { if (!isExist) { TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); triggerInfo.setCronExpression(dt.getCronExpression()); - TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(), - populateTaskProperties(tenantId, generatedTaskId, dt.getProperties()), triggerInfo); + TaskInfo taskInfo = new TaskInfo(nTaskName, dt.getTaskClassName(), TaskManagementUtil + .populateNTaskProperties(dt, nTaskName, serverHashIdx), triggerInfo); taskManager.registerTask(taskInfo); - taskManager.scheduleTask(generatedTaskId); + taskManager.scheduleTask(nTaskName); if (log.isDebugEnabled()) { - log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table"); + log.debug("New task -'" + nTaskName + "' created according to the dynamic task table."); } } } @@ -177,24 +187,8 @@ public class IoTSStartupHandler implements ServerStartupObserver { } } - private static Map populateTaskProperties(int tenantId, String generatedTaskId, - Map taskProperties) - throws TaskManagementException { - try { - int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService() - .getServerCtxInfo().getLocalServerHashIdx(); - taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); - taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId); - taskProperties.put(TaskMgtConstants.Task.TENANT_ID_PROP, String.valueOf(tenantId)); - return taskProperties; - } catch (HeartBeatManagementException e) { - String msg = "Unexpected exception when getting server hash index."; - log.error(msg, e); - throw new TaskManagementException(msg, e); - } - } - - private static void deleteObsoleteTasks(TaskService nTaskService, List dynamicTasks) + private static void deleteObsoleteTasks(TaskService nTaskService, + Map> tenantedDynamicTasks) throws TaskManagementException, TaskException { List tenants = new ArrayList<>(); @@ -224,6 +218,13 @@ public class IoTSStartupHandler implements ServerStartupObserver { } for (Tenant tenant : tenants) { + if (tenantedDynamicTasks.get(tenant.getId()) == null) { + if (log.isTraceEnabled()) { + log.trace("Dynamic tasks not running for tenant: [" + tenant.getId() + "] " + + tenant.getDomain()); + } + continue; + } PrivilegedCarbonContext.startTenantFlow(); PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true); if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { @@ -234,10 +235,10 @@ public class IoTSStartupHandler implements ServerStartupObserver { // Remove deleted items from the nTask core for (TaskInfo taskInfo : tasks) { boolean isExist = false; - for (DynamicTask dt : dynamicTasks) { + for (DynamicTask dt : tenantedDynamicTasks.get(tenant.getId())) { for (int hid : hashIds) { if (tenant.getId() == dt.getTenantId() && - taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId(), hid))) { + taskInfo.getName().equals(TaskManagementUtil.generateNTaskName(dt.getDynamicTaskId(), hid))) { isExist = true; break; } diff --git a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/h2.sql b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/h2.sql index 43e57cf430..b6dcb7d5be 100644 --- a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/h2.sql +++ b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/h2.sql @@ -782,9 +782,9 @@ CREATE TABLE IF NOT EXISTS DM_EXT_PERMISSION_MAPPING ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( DYNAMIC_TASK_ID INTEGER AUTO_INCREMENT NOT NULL, NAME VARCHAR(300) DEFAULT NULL , - CRON VARCHAR(8000) DEFAULT NULL, + CRON VARCHAR(100) DEFAULT NULL, IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE, - TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL, + TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL, TENANT_ID INTEGER DEFAULT 0, PRIMARY KEY (DYNAMIC_TASK_ID) ); @@ -792,8 +792,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES ( DYNAMIC_TASK_ID INTEGER NOT NULL, PROPERTY_NAME VARCHAR(100) DEFAULT 0, - PROPERTY_VALUE VARCHAR(100) DEFAULT NULL, - TENANT_ID VARCHAR(100), + PROPERTY_VALUE VARCHAR(8000) DEFAULT NULL, + TENANT_ID INTEGER DEFAULT 0, PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE diff --git a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mssql.sql b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mssql.sql index 318dfd2a76..cee731c034 100644 --- a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mssql.sql +++ b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mssql.sql @@ -852,10 +852,10 @@ CREATE TABLE DM_GEOFENCE_EVENT_MAPPING ( IF NOT EXISTS (SELECT * FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[DYNAMIC_TASK]') AND TYPE IN (N'U')) CREATE TABLE DYNAMIC_TASK ( DYNAMIC_TASK_ID INTEGER IDENTITY(1,1) NOT NULL, - NAME VARCHAR(255) DEFAULT NULL , - CRON VARCHAR(8000) DEFAULT NULL, + NAME VARCHAR(300) DEFAULT NULL , + CRON VARCHAR(100) DEFAULT NULL, IS_ENABLED BIT NOT NULL DEFAULT 0, - TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL, + TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL, TENANT_ID INTEGER DEFAULT 0, PRIMARY KEY (DYNAMIC_TASK_ID) ); @@ -864,8 +864,8 @@ IF NOT EXISTS (SELECT * FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[D CREATE TABLE DYNAMIC_TASK_PROPERTIES ( DYNAMIC_TASK_ID INTEGER NOT NULL, PROPERTY_NAME VARCHAR(100) DEFAULT 0, - PROPERTY_VALUE VARCHAR(100) DEFAULT NULL, - TENANT_ID VARCHAR(100), + PROPERTY_VALUE VARCHAR(8000) DEFAULT NULL, + TENANT_ID INTEGER DEFAULT 0, PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE diff --git a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mysql.sql b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mysql.sql index b0de3dc590..7ddaf22af6 100644 --- a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mysql.sql +++ b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/mysql.sql @@ -853,9 +853,9 @@ CREATE TABLE IF NOT EXISTS DM_EXT_PERMISSION_MAPPING ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( DYNAMIC_TASK_ID INTEGER AUTO_INCREMENT NOT NULL, NAME VARCHAR(300) DEFAULT NULL , - CRON VARCHAR(8000) DEFAULT NULL, + CRON VARCHAR(100) DEFAULT NULL, IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE, - TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL, + TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL, TENANT_ID INTEGER DEFAULT 0, PRIMARY KEY (DYNAMIC_TASK_ID) ) ENGINE=InnoDB; @@ -863,8 +863,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES ( DYNAMIC_TASK_ID INTEGER NOT NULL, PROPERTY_NAME VARCHAR(100) DEFAULT 0, - PROPERTY_VALUE VARCHAR(100) DEFAULT NULL, - TENANT_ID VARCHAR(100), + PROPERTY_VALUE TEXT DEFAULT NULL, + TENANT_ID INTEGER DEFAULT 0, PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE diff --git a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/oracle.sql b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/oracle.sql index d632be52aa..6e99bb43ee 100644 --- a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/oracle.sql +++ b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/oracle.sql @@ -1127,9 +1127,9 @@ CREATE TABLE DM_GEOFENCE ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( DYNAMIC_TASK_ID NUMBER(10) NOT NULL, NAME VARCHAR2(300) DEFAULT NULL , - CRON VARCHAR2(8000) DEFAULT NULL, + CRON VARCHAR2(100) DEFAULT NULL, IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE, - TASK_CLASS_NAME VARCHAR2(8000) DEFAULT NULL, + TASK_CLASS_NAME VARCHAR2(1000) DEFAULT NULL, TENANT_ID INTEGER DEFAULT 0, CONSTRAINT PK_DYNAMIC_TASK PRIMARY KEY (DYNAMIC_TASK_ID) ) ENGINE=InnoDB; @@ -1137,8 +1137,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES ( DYNAMIC_TASK_ID INTEGER NOT NULL, PROPERTY_NAME VARCHAR2(100) DEFAULT 0, - PROPERTY_VALUE VARCHAR2(100) DEFAULT NULL, - TENANT_ID VARCHAR2(100), + PROPERTY_VALUE VARCHAR2(8000) DEFAULT NULL, + TENANT_ID INTEGER DEFAULT 0, CONSTRAINT PK_DYNAMIC_TASK_PROPERTIES PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE diff --git a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/postgresql.sql b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/postgresql.sql index bc22e6db63..5d5c7446e6 100644 --- a/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/postgresql.sql +++ b/features/device-mgt/io.entgra.device.mgt.core.device.mgt.basics.feature/src/main/resources/dbscripts/cdm/postgresql.sql @@ -772,9 +772,9 @@ CREATE TABLE IF NOT EXISTS DM_GEOFENCE ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( DYNAMIC_TASK_ID INTEGER DEFAULT NEXTVAL ('DYNAMIC_TASK_seq') NOT NULL, NAME VARCHAR(300) DEFAULT NULL , - CRON VARCHAR(8000) DEFAULT NULL, + CRON VARCHAR(100) DEFAULT NULL, IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE, - TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL, + TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL, TENANT_ID INTEGER DEFAULT 0, PRIMARY KEY (DYNAMIC_TASK_ID) ) ENGINE=InnoDB; @@ -782,8 +782,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK ( CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES ( DYNAMIC_TASK_ID INTEGER NOT NULL, PROPERTY_NAME VARCHAR(100) DEFAULT 0, - PROPERTY_VALUE VARCHAR(100) DEFAULT NULL, - TENANT_ID VARCHAR(100), + PROPERTY_VALUE TEXT DEFAULT NULL, + TENANT_ID INTEGER DEFAULT 0, PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID), CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE