diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/pom.xml b/components/device-mgt/org.wso2.carbon.device.mgt.core/pom.xml index 2572bc79a9..a55ae5faff 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/pom.xml +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/pom.xml @@ -103,6 +103,7 @@ org.wso2.carbon.ndatasource.core, org.wso2.carbon.ntask.core.*, org.wso2.carbon.ntask.common, + io.entgra.task.mgt.common.*, org.apache.commons.collections;version="${commons-collections.version.range}", org.wso2.carbon.email.sender.*, io.swagger.annotations.*;resolution:=optional, @@ -347,6 +348,10 @@ okhttp compile + + org.wso2.carbon.devicemgt + io.entgra.task.mgt.common + diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java index 2ee7e32032..c1bd71a2a5 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/operation/timeout/task/impl/OperationTimeoutTask.java @@ -32,31 +32,10 @@ import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask; import java.util.ArrayList; import java.util.List; -import java.util.Map; public class OperationTimeoutTask extends DynamicPartitionedScheduleTask { private static final Log log = LogFactory.getLog(OperationTimeoutTask.class); - private OperationTimeout operationTimeoutConfig; - - @Override - public void setProperties(Map properties) { - super.setProperties(properties); - String operationTimeoutTaskConfigStr = properties - .get(OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG); - Gson gson = new Gson(); - operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class); - } - - @Override - public String getProperty(String name) { - return super.getProperty(name); - } - - @Override - public void refreshContext() { - super.refreshContext(); - } @Override protected void setup() { @@ -65,12 +44,15 @@ public class OperationTimeoutTask extends DynamicPartitionedScheduleTask { @Override protected void executeDynamicTask() { + String operationTimeoutTaskConfigStr = getProperty( + OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG); + Gson gson = new Gson(); + OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class); try { - long timeMillis = System.currentTimeMillis() - operationTimeoutConfig.getTimeout() * 60 * 1000; List deviceTypes = new ArrayList<>(); if (operationTimeoutConfig.getDeviceTypes().size() == 1 && - "ALL".equals(operationTimeoutConfig.getDeviceTypes().get( 0))) { + "ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) { try { List deviceTypeList = DeviceManagementDataHolder.getInstance() .getDeviceManagementProvider().getDeviceTypes(); diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java index 5182fc3651..9d2e1ee7ce 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java @@ -37,7 +37,6 @@ import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * This implements the Task service which monitors the device activity periodically & update the device-status if @@ -47,19 +46,8 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask { private static final Log log = LogFactory.getLog(DeviceStatusMonitoringTask.class); private String deviceType; - private DeviceStatusTaskPluginConfig deviceStatusTaskPluginConfig; private int deviceTypeId = -1; - @Override - public void setProperties(Map properties) { - super.setProperties(properties); - deviceType = properties.get(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE); - deviceTypeId = Integer.parseInt(properties.get(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE_ID)); - String deviceStatusTaskConfigStr = properties.get(DeviceStatusTaskManagerServiceImpl.DEVICE_STATUS_TASK_CONFIG); - Gson gson = new Gson(); - deviceStatusTaskPluginConfig = gson.fromJson(deviceStatusTaskConfigStr, DeviceStatusTaskPluginConfig.class); - } - @Override protected void setup() { } @@ -92,6 +80,11 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask { @Override public void executeDynamicTask() { + deviceType = getProperty(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE); + deviceTypeId = Integer.parseInt(getProperty(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE_ID)); + String deviceStatusTaskConfigStr = getProperty(DeviceStatusTaskManagerServiceImpl.DEVICE_STATUS_TASK_CONFIG); + Gson gson = new Gson(); + DeviceStatusTaskPluginConfig deviceStatusTaskPluginConfig = gson.fromJson(deviceStatusTaskConfigStr, DeviceStatusTaskPluginConfig.class); try { List enrolmentInfoTobeUpdated = new ArrayList<>(); List allDevicesForMonitoring = getAllDevicesForMonitoring(); @@ -102,10 +95,10 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask { EnrolmentInfo enrolmentInfo = monitoringData.getDevice().getEnrolmentInfo(); EnrolmentInfo.Status status = null; - if (lastUpdatedTime >= this.deviceStatusTaskPluginConfig + if (lastUpdatedTime >= deviceStatusTaskPluginConfig .getIdleTimeToMarkInactive()) { status = EnrolmentInfo.Status.INACTIVE; - } else if (lastUpdatedTime >= this.deviceStatusTaskPluginConfig + } else if (lastUpdatedTime >= deviceStatusTaskPluginConfig .getIdleTimeToMarkUnreachable()) { status = EnrolmentInfo.Status.UNREACHABLE; } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java index 0c6f77c15d..a13031eabb 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java @@ -48,7 +48,6 @@ import org.wso2.carbon.device.mgt.core.task.DeviceMgtTaskException; import org.wso2.carbon.device.mgt.core.task.DeviceTaskManager; import java.util.List; -import java.util.Map; public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask { @@ -56,14 +55,9 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask { private String deviceType; private DeviceManagementProviderService deviceManagementProviderService; - @Override - public void setProperties(Map map) { - super.setProperties(map); - deviceType = map.get("DEVICE_TYPE"); - } - @Override public void executeDynamicTask() { + deviceType = getProperty("DEVICE_TYPE"); deviceManagementProviderService = DeviceManagementDataHolder.getInstance() .getDeviceManagementProvider(); OperationMonitoringTaskConfig operationMonitoringTaskConfig = deviceManagementProviderService diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java index 0bbb59d2bd..6ce1ac5b6b 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java @@ -19,10 +19,11 @@ package org.wso2.carbon.device.mgt.core.task.impl; import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; +import io.entgra.task.mgt.common.constant.TaskMgtConstants; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.wso2.carbon.device.mgt.common.ServerCtxInfo; import org.wso2.carbon.device.mgt.common.DynamicTaskContext; +import org.wso2.carbon.device.mgt.common.ServerCtxInfo; import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder; import org.wso2.carbon.ntask.core.Task; @@ -37,11 +38,11 @@ public abstract class DynamicPartitionedScheduleTask implements Task { private Map properties; @Override - public void setProperties(Map properties) { + public final void setProperties(Map properties) { this.properties = properties; } - public String getProperty(String name) { + public final String getProperty(String name) { if (properties == null) { return null; } @@ -62,7 +63,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task { } } } catch (HeartBeatManagementException e) { - log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function." , e); + log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.", e); } setup(); } @@ -70,11 +71,40 @@ public abstract class DynamicPartitionedScheduleTask implements Task { @Override public final void execute() { refreshContext(); - executeDynamicTask(); + if (taskContext != null && taskContext.isPartitioningEnabled()) { + String localHashIndex = getProperty(TaskMgtConstants.Task.LOCAL_HASH_INDEX); + // These tasks are not dynamically scheduled. They are added via a config so scheduled in each node + // during the server startup + if (localHashIndex == null ) { + if (log.isDebugEnabled()) { + log.debug("Executing startup scheduled task (" + getTaskName() + ")"); + } + executeDynamicTask(); + return; + } + if (localHashIndex.equals(String.valueOf(taskContext.getServerHashIndex()))) { + if (log.isDebugEnabled()) { + log.debug("Executing dynamically scheduled task (" + getTaskName() + + ") for current server hash index: " + localHashIndex); + } + executeDynamicTask(); + } else { + if (log.isDebugEnabled()) { + log.debug("Ignoring execution of task (" + getTaskName() + + ") not belonging to current serer hash index: " + localHashIndex); + } + } + } else { + executeDynamicTask(); + } + } + + public String getTaskName() { + return getProperty(TaskMgtConstants.Task.LOCAL_TASK_NAME); } - public void refreshContext(){ - if(taskContext != null && taskContext.isPartitioningEnabled()) { + public void refreshContext() { + if (taskContext != null && taskContext.isPartitioningEnabled()) { try { updateContext(); } catch (HeartBeatManagementException e) { diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.common/src/main/java/io/entgra/task/mgt/common/constant/TaskMgtConstant.java b/components/task-mgt/task-manager/io.entgra.task.mgt.common/src/main/java/io/entgra/task/mgt/common/constant/TaskMgtConstants.java similarity index 87% rename from components/task-mgt/task-manager/io.entgra.task.mgt.common/src/main/java/io/entgra/task/mgt/common/constant/TaskMgtConstant.java rename to components/task-mgt/task-manager/io.entgra.task.mgt.common/src/main/java/io/entgra/task/mgt/common/constant/TaskMgtConstants.java index 5dcc03bd1b..2ffe960964 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.common/src/main/java/io/entgra/task/mgt/common/constant/TaskMgtConstant.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.common/src/main/java/io/entgra/task/mgt/common/constant/TaskMgtConstants.java @@ -17,7 +17,7 @@ */ package io.entgra.task.mgt.common.constant; -public class TaskMgtConstant { +public class TaskMgtConstants { public static final class DataSourceProperties { private DataSourceProperties() { throw new AssertionError(); @@ -46,7 +46,8 @@ public class TaskMgtConstant { public static final String NAME_SEPARATOR = "_"; public static final String PROPERTY_KEY_COLUMN_NAME = "PROPERTY_NAME"; public static final String PROPERTY_VALUE_COLUMN_NAME = "PROPERTY_VALUE"; - public static final String __TENANT_ID_PROP__ = "__TENANT_ID_PROP__"; - + public static final String TENANT_ID_PROP = "__TENANT_ID_PROP__"; + public static final String LOCAL_HASH_INDEX = "__LOCAL_HASH_INDEX__"; + public static final String LOCAL_TASK_NAME = "__LOCAL_TASK_NAME__"; } } diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/pom.xml b/components/task-mgt/task-manager/io.entgra.task.mgt.core/pom.xml index 5202f25781..f0ffd6c1f5 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/pom.xml +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/pom.xml @@ -63,10 +63,12 @@ javax.naming, io.entgra.task.mgt.common.*, org.wso2.carbon.utils.*, - org.wso2.carbon.ntask.*, - org.wso2.carbon.device.mgt.core.*, + org.wso2.carbon.ntask.core.*, + org.wso2.carbon.ntask.common, org.wso2.carbon.device.mgt.common.*, org.wso2.carbon.context, + org.apache.commons.codec.binary;version="${commons-codec.wso2.osgi.version.range}", + org.apache.commons.codec.digest;version="${commons-codec.wso2.osgi.version.range}", io.entgra.server.bootup.heartbeat.beacon.dto, io.entgra.server.bootup.heartbeat.beacon.exception, io.entgra.server.bootup.heartbeat.beacon.service, @@ -113,8 +115,8 @@ provided - org.wso2.carbon.devicemgt - org.wso2.carbon.device.mgt.core + commons-codec.wso2 + commons-codec provided @@ -127,6 +129,13 @@ io.entgra.server.bootup.heartbeat.beacon provided + + + + org.wso2.carbon.commons + org.wso2.carbon.ntask.core + provided + \ No newline at end of file diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/config/TaskConfigurationManager.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/config/TaskConfigurationManager.java index 9700465a1d..2779a07ff9 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/config/TaskConfigurationManager.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/config/TaskConfigurationManager.java @@ -17,7 +17,7 @@ */ package io.entgra.task.mgt.core.config; -import io.entgra.task.mgt.common.constant.TaskMgtConstant; +import io.entgra.task.mgt.common.constant.TaskMgtConstants; import io.entgra.task.mgt.common.exception.TaskManagementException; import io.entgra.task.mgt.core.util.TaskManagementUtil; import org.w3c.dom.Document; @@ -38,7 +38,7 @@ public class TaskConfigurationManager { private static final String TASK_MGT_CONFIG_PATH = CarbonUtils.getCarbonConfigDirPath() + File.separator + - TaskMgtConstant.DataSourceProperties.TASK_CONFIG_XML_NAME; + TaskMgtConstants.DataSourceProperties.TASK_CONFIG_XML_NAME; public static TaskConfigurationManager getInstance() { if (taskConfigurationManager == null) { diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/common/TaskManagementDAOFactory.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/common/TaskManagementDAOFactory.java index c5ef61db2a..c8f8623777 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/common/TaskManagementDAOFactory.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/common/TaskManagementDAOFactory.java @@ -17,7 +17,7 @@ */ package io.entgra.task.mgt.core.dao.common; -import io.entgra.task.mgt.common.constant.TaskMgtConstant; +import io.entgra.task.mgt.common.constant.TaskMgtConstants; import io.entgra.task.mgt.common.exception.IllegalTransactionStateException; import io.entgra.task.mgt.common.exception.TransactionManagementException; import io.entgra.task.mgt.common.exception.UnsupportedDatabaseEngineException; @@ -25,9 +25,9 @@ import io.entgra.task.mgt.core.config.datasource.DataSourceConfig; import io.entgra.task.mgt.core.config.datasource.JNDILookupDefinition; import io.entgra.task.mgt.core.dao.DynamicTaskDAO; import io.entgra.task.mgt.core.dao.DynamicTaskPropDAO; -import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil; import io.entgra.task.mgt.core.dao.impl.DynamicTaskDAOImpl; import io.entgra.task.mgt.core.dao.impl.DynamicTaskPropDAOImpl; +import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,8 +48,8 @@ public class TaskManagementDAOFactory { public static DynamicTaskDAO getDynamicTaskDAO() { if (databaseEngine != null) { switch (databaseEngine) { - case TaskMgtConstant.DataBaseTypes.DB_TYPE_H2: - case TaskMgtConstant.DataBaseTypes.DB_TYPE_MYSQL: + case TaskMgtConstants.DataBaseTypes.DB_TYPE_H2: + case TaskMgtConstants.DataBaseTypes.DB_TYPE_MYSQL: return new DynamicTaskDAOImpl(); default: throw new UnsupportedDatabaseEngineException("Unsupported database engine : " + databaseEngine); @@ -61,8 +61,8 @@ public class TaskManagementDAOFactory { public static DynamicTaskPropDAO getDynamicTaskPropDAO() { if (databaseEngine != null) { switch (databaseEngine) { - case TaskMgtConstant.DataBaseTypes.DB_TYPE_H2: - case TaskMgtConstant.DataBaseTypes.DB_TYPE_MYSQL: + case TaskMgtConstants.DataBaseTypes.DB_TYPE_H2: + case TaskMgtConstants.DataBaseTypes.DB_TYPE_MYSQL: return new DynamicTaskPropDAOImpl(); default: throw new UnsupportedDatabaseEngineException("Unsupported database engine : " + databaseEngine); diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/impl/DynamicTaskPropDAOImpl.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/impl/DynamicTaskPropDAOImpl.java index a6e02a96c6..66d6920feb 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/impl/DynamicTaskPropDAOImpl.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/impl/DynamicTaskPropDAOImpl.java @@ -17,15 +17,14 @@ */ package io.entgra.task.mgt.core.dao.impl; -import io.entgra.task.mgt.common.constant.TaskMgtConstant; +import io.entgra.task.mgt.common.constant.TaskMgtConstants; import io.entgra.task.mgt.common.exception.TaskManagementDAOException; import io.entgra.task.mgt.core.dao.DynamicTaskPropDAO; -import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil; import io.entgra.task.mgt.core.dao.common.TaskManagementDAOFactory; +import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.wso2.carbon.context.PrivilegedCarbonContext; -import org.wso2.carbon.device.mgt.core.dao.util.DeviceManagementDAOUtil; import java.sql.Connection; import java.sql.PreparedStatement; @@ -79,8 +78,8 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO { resultSet = stmt.executeQuery(); properties = new HashMap<>(); while (resultSet.next()) { - properties.put(resultSet.getString(TaskMgtConstant.Task.PROPERTY_KEY_COLUMN_NAME) - , resultSet.getString(TaskMgtConstant.Task.PROPERTY_VALUE_COLUMN_NAME)); + properties.put(resultSet.getString(TaskMgtConstants.Task.PROPERTY_KEY_COLUMN_NAME) + , resultSet.getString(TaskMgtConstants.Task.PROPERTY_VALUE_COLUMN_NAME)); } } catch (SQLException e) { String msg = "Error occurred while fetching task properties of : '" + dynamicTaskId + "'"; @@ -119,7 +118,7 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO { throw new TaskManagementDAOException ("Error occurred while updating device properties to database.", e); } finally { - DeviceManagementDAOUtil.cleanupResources(stmt, null); + TaskManagementDAOUtil.cleanupResources(stmt, null); } } } diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/util/TaskManagementDAOUtil.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/util/TaskManagementDAOUtil.java index 843fc178aa..c9ba9ea07a 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/util/TaskManagementDAOUtil.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/dao/util/TaskManagementDAOUtil.java @@ -20,7 +20,6 @@ package io.entgra.task.mgt.core.dao.util; import io.entgra.task.mgt.common.bean.DynamicTask; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.wso2.carbon.device.mgt.common.Device; import javax.naming.InitialContext; import javax.sql.DataSource; diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java index 3c053fa178..e2deb15777 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/service/TaskManagementServiceImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, Entgra Pvt Ltd. (http://www.wso2.org) All Rights Reserved. + * Copyright (c) 2023, Entgra Pvt Ltd. (https://entgra.io) All Rights Reserved. * * Entgra Pvt Ltd. licenses this file to you under the Apache License, * Version 2.0 (the "License"); you may not use this file except @@ -17,11 +17,12 @@ */ package io.entgra.task.mgt.core.service; +import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.task.mgt.common.bean.DynamicTask; -import io.entgra.task.mgt.common.constant.TaskMgtConstant; -import io.entgra.task.mgt.common.exception.TaskNotFoundException; +import io.entgra.task.mgt.common.constant.TaskMgtConstants; import io.entgra.task.mgt.common.exception.TaskManagementDAOException; import io.entgra.task.mgt.common.exception.TaskManagementException; +import io.entgra.task.mgt.common.exception.TaskNotFoundException; import io.entgra.task.mgt.common.exception.TransactionManagementException; import io.entgra.task.mgt.common.spi.TaskManagementService; import io.entgra.task.mgt.core.dao.DynamicTaskDAO; @@ -62,13 +63,13 @@ public class TaskManagementServiceImpl implements TaskManagementService { throw new TaskManagementException(msg); } if (!nTaskService.getRegisteredTaskTypes().contains( - TaskMgtConstant.Task.DYNAMIC_TASK_TYPE)) { + TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { try { - nTaskService.registerTaskType(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE); - this.taskManager = nTaskService.getTaskManager(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE); + nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); + this.taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); } catch (TaskException e) { String msg = "Error occurred while registering task type [" - + TaskMgtConstant.Task.DYNAMIC_TASK_TYPE + + TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + "], hence unable to schedule the task."; log.error(msg); throw new TaskManagementException(msg, e); @@ -90,6 +91,17 @@ public class TaskManagementServiceImpl implements TaskManagementService { // add into the ntask core taskId = TaskManagementUtil.generateTaskId(dynamicTaskId); + try { + int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() + .getServerCtxInfo().getLocalServerHashIdx(); + taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx)); + taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, taskId); + } catch (HeartBeatManagementException e) { + String msg = "Unexpected exception when getting server hash index."; + log.error(msg, e); + throw new TaskManagementException(msg, e); + } + if (!isTaskExists(taskId)) { TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo(); triggerInfo.setCronExpression(dynamicTask.getCronExpression()); @@ -101,7 +113,7 @@ public class TaskManagementServiceImpl implements TaskManagementService { } } else { String msg = "Task '" + taskId + "' is already exists in the ntask core " - + "Hence cannot create another task for the same."; + + "Hence not creating another task for the same name."; log.error(msg); } diff --git a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java index 5b22aa7d6b..fdfdaeaf4f 100755 --- a/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java +++ b/components/task-mgt/task-manager/io.entgra.task.mgt.core/src/main/java/io/entgra/task/mgt/core/util/TaskManagementUtil.java @@ -19,7 +19,7 @@ package io.entgra.task.mgt.core.util; import com.google.gson.Gson; import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; -import io.entgra.task.mgt.common.constant.TaskMgtConstant; +import io.entgra.task.mgt.common.constant.TaskMgtConstants; import io.entgra.task.mgt.common.exception.TaskManagementException; import io.entgra.task.mgt.core.internal.TaskManagerDataHolder; import org.apache.commons.codec.digest.DigestUtils; @@ -31,7 +31,6 @@ import javax.xml.XMLConstants; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import java.io.File; -import java.io.IOException; import java.util.Map; /** @@ -60,8 +59,8 @@ public class TaskManagementUtil { try { int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService() .getServerCtxInfo().getLocalServerHashIdx(); - return TaskMgtConstant.Task.DYNAMIC_TASK_TYPE + TaskMgtConstant.Task.NAME_SEPARATOR + dynamicTaskId - + TaskMgtConstant.Task.NAME_SEPARATOR + serverHashIdx; + return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId + + TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx; } catch (HeartBeatManagementException e) { String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId; log.error(msg, e); @@ -70,11 +69,12 @@ public class TaskManagementUtil { } public static String generateTaskPropsMD5(Map taskProperties) throws TaskManagementException { - if (taskProperties.containsKey(TaskMgtConstant.Task.__TENANT_ID_PROP__)) { - taskProperties.remove(TaskMgtConstant.Task.__TENANT_ID_PROP__); - } + taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP); + taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX); + taskProperties.remove(TaskMgtConstants.Task.LOCAL_TASK_NAME); Gson gson = new Gson(); String json = gson.toJson(taskProperties); return DigestUtils.md5Hex(json); } + } diff --git a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java index 611cb8e808..824cb57295 100755 --- a/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java +++ b/components/task-mgt/task-watcher/io.entgra.task.mgt.watcher/src/main/java/io/entgra/task/mgt/watcher/IoTSStartupHandler.java @@ -20,7 +20,7 @@ package io.entgra.task.mgt.watcher; import io.entgra.task.mgt.common.bean.DynamicTask; -import io.entgra.task.mgt.common.constant.TaskMgtConstant; +import io.entgra.task.mgt.common.constant.TaskMgtConstants; import io.entgra.task.mgt.common.exception.TaskManagementException; import io.entgra.task.mgt.core.util.TaskManagementUtil; import io.entgra.task.mgt.watcher.internal.TaskWatcherDataHolder; @@ -64,10 +64,10 @@ public class IoTSStartupHandler implements ServerStartupObserver { } try { if (!nTaskService.getRegisteredTaskTypes().contains( - TaskMgtConstant.Task.DYNAMIC_TASK_TYPE)) { - nTaskService.registerTaskType(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE); + TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) { + nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); } - taskManager = nTaskService.getTaskManager(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE); + taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE); List dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService() .getAllDynamicTasks();