Enhance dynamic task execution functionality

improvement/fix-task-deletion
Charitha Goonetilleke 2 years ago
parent ce0d834b12
commit 78f8e80718

@ -103,6 +103,7 @@
org.wso2.carbon.ndatasource.core,
org.wso2.carbon.ntask.core.*,
org.wso2.carbon.ntask.common,
io.entgra.task.mgt.common.*,
org.apache.commons.collections;version="${commons-collections.version.range}",
org.wso2.carbon.email.sender.*,
io.swagger.annotations.*;resolution:=optional,
@ -347,6 +348,10 @@
<artifactId>okhttp</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>io.entgra.task.mgt.common</artifactId>
</dependency>
</dependencies>
</project>

@ -32,31 +32,10 @@ import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class OperationTimeoutTask extends DynamicPartitionedScheduleTask {
private static final Log log = LogFactory.getLog(OperationTimeoutTask.class);
private OperationTimeout operationTimeoutConfig;
@Override
public void setProperties(Map<String, String> properties) {
super.setProperties(properties);
String operationTimeoutTaskConfigStr = properties
.get(OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
Gson gson = new Gson();
operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class);
}
@Override
public String getProperty(String name) {
return super.getProperty(name);
}
@Override
public void refreshContext() {
super.refreshContext();
}
@Override
protected void setup() {
@ -65,8 +44,11 @@ public class OperationTimeoutTask extends DynamicPartitionedScheduleTask {
@Override
protected void executeDynamicTask() {
String operationTimeoutTaskConfigStr = getProperty(
OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
Gson gson = new Gson();
OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class);
try {
long timeMillis = System.currentTimeMillis() - operationTimeoutConfig.getTimeout() * 60 * 1000;
List<String> deviceTypes = new ArrayList<>();
if (operationTimeoutConfig.getDeviceTypes().size() == 1 &&

@ -37,7 +37,6 @@ import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* This implements the Task service which monitors the device activity periodically & update the device-status if
@ -47,19 +46,8 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask {
private static final Log log = LogFactory.getLog(DeviceStatusMonitoringTask.class);
private String deviceType;
private DeviceStatusTaskPluginConfig deviceStatusTaskPluginConfig;
private int deviceTypeId = -1;
@Override
public void setProperties(Map<String, String> properties) {
super.setProperties(properties);
deviceType = properties.get(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE);
deviceTypeId = Integer.parseInt(properties.get(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE_ID));
String deviceStatusTaskConfigStr = properties.get(DeviceStatusTaskManagerServiceImpl.DEVICE_STATUS_TASK_CONFIG);
Gson gson = new Gson();
deviceStatusTaskPluginConfig = gson.fromJson(deviceStatusTaskConfigStr, DeviceStatusTaskPluginConfig.class);
}
@Override
protected void setup() {
}
@ -92,6 +80,11 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask {
@Override
public void executeDynamicTask() {
deviceType = getProperty(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE);
deviceTypeId = Integer.parseInt(getProperty(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE_ID));
String deviceStatusTaskConfigStr = getProperty(DeviceStatusTaskManagerServiceImpl.DEVICE_STATUS_TASK_CONFIG);
Gson gson = new Gson();
DeviceStatusTaskPluginConfig deviceStatusTaskPluginConfig = gson.fromJson(deviceStatusTaskConfigStr, DeviceStatusTaskPluginConfig.class);
try {
List<EnrolmentInfo> enrolmentInfoTobeUpdated = new ArrayList<>();
List<DeviceMonitoringData> allDevicesForMonitoring = getAllDevicesForMonitoring();
@ -102,10 +95,10 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask {
EnrolmentInfo enrolmentInfo = monitoringData.getDevice().getEnrolmentInfo();
EnrolmentInfo.Status status = null;
if (lastUpdatedTime >= this.deviceStatusTaskPluginConfig
if (lastUpdatedTime >= deviceStatusTaskPluginConfig
.getIdleTimeToMarkInactive()) {
status = EnrolmentInfo.Status.INACTIVE;
} else if (lastUpdatedTime >= this.deviceStatusTaskPluginConfig
} else if (lastUpdatedTime >= deviceStatusTaskPluginConfig
.getIdleTimeToMarkUnreachable()) {
status = EnrolmentInfo.Status.UNREACHABLE;
}

@ -48,7 +48,6 @@ import org.wso2.carbon.device.mgt.core.task.DeviceMgtTaskException;
import org.wso2.carbon.device.mgt.core.task.DeviceTaskManager;
import java.util.List;
import java.util.Map;
public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask {
@ -56,14 +55,9 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask {
private String deviceType;
private DeviceManagementProviderService deviceManagementProviderService;
@Override
public void setProperties(Map<String, String> map) {
super.setProperties(map);
deviceType = map.get("DEVICE_TYPE");
}
@Override
public void executeDynamicTask() {
deviceType = getProperty("DEVICE_TYPE");
deviceManagementProviderService = DeviceManagementDataHolder.getInstance()
.getDeviceManagementProvider();
OperationMonitoringTaskConfig operationMonitoringTaskConfig = deviceManagementProviderService

@ -19,10 +19,11 @@
package org.wso2.carbon.device.mgt.core.task.impl;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder;
import org.wso2.carbon.ntask.core.Task;
@ -37,11 +38,11 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
private Map<String, String> properties;
@Override
public void setProperties(Map<String, String> properties) {
public final void setProperties(Map<String, String> properties) {
this.properties = properties;
}
public String getProperty(String name) {
public final String getProperty(String name) {
if (properties == null) {
return null;
}
@ -70,7 +71,36 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
@Override
public final void execute() {
refreshContext();
if (taskContext != null && taskContext.isPartitioningEnabled()) {
String localHashIndex = getProperty(TaskMgtConstants.Task.LOCAL_HASH_INDEX);
// These tasks are not dynamically scheduled. They are added via a config so scheduled in each node
// during the server startup
if (localHashIndex == null ) {
if (log.isDebugEnabled()) {
log.debug("Executing startup scheduled task (" + getTaskName() + ")");
}
executeDynamicTask();
return;
}
if (localHashIndex.equals(String.valueOf(taskContext.getServerHashIndex()))) {
if (log.isDebugEnabled()) {
log.debug("Executing dynamically scheduled task (" + getTaskName() +
") for current server hash index: " + localHashIndex);
}
executeDynamicTask();
} else {
if (log.isDebugEnabled()) {
log.debug("Ignoring execution of task (" + getTaskName() +
") not belonging to current serer hash index: " + localHashIndex);
}
}
} else {
executeDynamicTask();
}
}
public String getTaskName() {
return getProperty(TaskMgtConstants.Task.LOCAL_TASK_NAME);
}
public void refreshContext() {

@ -17,7 +17,7 @@
*/
package io.entgra.task.mgt.common.constant;
public class TaskMgtConstant {
public class TaskMgtConstants {
public static final class DataSourceProperties {
private DataSourceProperties() {
throw new AssertionError();
@ -46,7 +46,8 @@ public class TaskMgtConstant {
public static final String NAME_SEPARATOR = "_";
public static final String PROPERTY_KEY_COLUMN_NAME = "PROPERTY_NAME";
public static final String PROPERTY_VALUE_COLUMN_NAME = "PROPERTY_VALUE";
public static final String __TENANT_ID_PROP__ = "__TENANT_ID_PROP__";
public static final String TENANT_ID_PROP = "__TENANT_ID_PROP__";
public static final String LOCAL_HASH_INDEX = "__LOCAL_HASH_INDEX__";
public static final String LOCAL_TASK_NAME = "__LOCAL_TASK_NAME__";
}
}

@ -63,10 +63,12 @@
javax.naming,
io.entgra.task.mgt.common.*,
org.wso2.carbon.utils.*,
org.wso2.carbon.ntask.*,
org.wso2.carbon.device.mgt.core.*,
org.wso2.carbon.ntask.core.*,
org.wso2.carbon.ntask.common,
org.wso2.carbon.device.mgt.common.*,
org.wso2.carbon.context,
org.apache.commons.codec.binary;version="${commons-codec.wso2.osgi.version.range}",
org.apache.commons.codec.digest;version="${commons-codec.wso2.osgi.version.range}",
io.entgra.server.bootup.heartbeat.beacon.dto,
io.entgra.server.bootup.heartbeat.beacon.exception,
io.entgra.server.bootup.heartbeat.beacon.service,
@ -113,8 +115,8 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>org.wso2.carbon.device.mgt.core</artifactId>
<groupId>commons-codec.wso2</groupId>
<artifactId>commons-codec</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
@ -127,6 +129,13 @@
<artifactId>io.entgra.server.bootup.heartbeat.beacon</artifactId>
<scope>provided</scope>
</dependency>
<!--nTask dependencies-->
<dependency>
<groupId>org.wso2.carbon.commons</groupId>
<artifactId>org.wso2.carbon.ntask.core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

@ -17,7 +17,7 @@
*/
package io.entgra.task.mgt.core.config;
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
import io.entgra.task.mgt.common.exception.TaskManagementException;
import io.entgra.task.mgt.core.util.TaskManagementUtil;
import org.w3c.dom.Document;
@ -38,7 +38,7 @@ public class TaskConfigurationManager {
private static final String TASK_MGT_CONFIG_PATH =
CarbonUtils.getCarbonConfigDirPath() + File.separator +
TaskMgtConstant.DataSourceProperties.TASK_CONFIG_XML_NAME;
TaskMgtConstants.DataSourceProperties.TASK_CONFIG_XML_NAME;
public static TaskConfigurationManager getInstance() {
if (taskConfigurationManager == null) {

@ -17,7 +17,7 @@
*/
package io.entgra.task.mgt.core.dao.common;
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
import io.entgra.task.mgt.common.exception.IllegalTransactionStateException;
import io.entgra.task.mgt.common.exception.TransactionManagementException;
import io.entgra.task.mgt.common.exception.UnsupportedDatabaseEngineException;
@ -25,9 +25,9 @@ import io.entgra.task.mgt.core.config.datasource.DataSourceConfig;
import io.entgra.task.mgt.core.config.datasource.JNDILookupDefinition;
import io.entgra.task.mgt.core.dao.DynamicTaskDAO;
import io.entgra.task.mgt.core.dao.DynamicTaskPropDAO;
import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil;
import io.entgra.task.mgt.core.dao.impl.DynamicTaskDAOImpl;
import io.entgra.task.mgt.core.dao.impl.DynamicTaskPropDAOImpl;
import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -48,8 +48,8 @@ public class TaskManagementDAOFactory {
public static DynamicTaskDAO getDynamicTaskDAO() {
if (databaseEngine != null) {
switch (databaseEngine) {
case TaskMgtConstant.DataBaseTypes.DB_TYPE_H2:
case TaskMgtConstant.DataBaseTypes.DB_TYPE_MYSQL:
case TaskMgtConstants.DataBaseTypes.DB_TYPE_H2:
case TaskMgtConstants.DataBaseTypes.DB_TYPE_MYSQL:
return new DynamicTaskDAOImpl();
default:
throw new UnsupportedDatabaseEngineException("Unsupported database engine : " + databaseEngine);
@ -61,8 +61,8 @@ public class TaskManagementDAOFactory {
public static DynamicTaskPropDAO getDynamicTaskPropDAO() {
if (databaseEngine != null) {
switch (databaseEngine) {
case TaskMgtConstant.DataBaseTypes.DB_TYPE_H2:
case TaskMgtConstant.DataBaseTypes.DB_TYPE_MYSQL:
case TaskMgtConstants.DataBaseTypes.DB_TYPE_H2:
case TaskMgtConstants.DataBaseTypes.DB_TYPE_MYSQL:
return new DynamicTaskPropDAOImpl();
default:
throw new UnsupportedDatabaseEngineException("Unsupported database engine : " + databaseEngine);

@ -17,15 +17,14 @@
*/
package io.entgra.task.mgt.core.dao.impl;
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
import io.entgra.task.mgt.common.exception.TaskManagementDAOException;
import io.entgra.task.mgt.core.dao.DynamicTaskPropDAO;
import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil;
import io.entgra.task.mgt.core.dao.common.TaskManagementDAOFactory;
import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.core.dao.util.DeviceManagementDAOUtil;
import java.sql.Connection;
import java.sql.PreparedStatement;
@ -79,8 +78,8 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
resultSet = stmt.executeQuery();
properties = new HashMap<>();
while (resultSet.next()) {
properties.put(resultSet.getString(TaskMgtConstant.Task.PROPERTY_KEY_COLUMN_NAME)
, resultSet.getString(TaskMgtConstant.Task.PROPERTY_VALUE_COLUMN_NAME));
properties.put(resultSet.getString(TaskMgtConstants.Task.PROPERTY_KEY_COLUMN_NAME)
, resultSet.getString(TaskMgtConstants.Task.PROPERTY_VALUE_COLUMN_NAME));
}
} catch (SQLException e) {
String msg = "Error occurred while fetching task properties of : '" + dynamicTaskId + "'";
@ -119,7 +118,7 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
throw new TaskManagementDAOException
("Error occurred while updating device properties to database.", e);
} finally {
DeviceManagementDAOUtil.cleanupResources(stmt, null);
TaskManagementDAOUtil.cleanupResources(stmt, null);
}
}
}

@ -20,7 +20,6 @@ package io.entgra.task.mgt.core.dao.util;
import io.entgra.task.mgt.common.bean.DynamicTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.Device;
import javax.naming.InitialContext;
import javax.sql.DataSource;

@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, Entgra Pvt Ltd. (http://www.wso2.org) All Rights Reserved.
* Copyright (c) 2023, Entgra Pvt Ltd. (https://entgra.io) All Rights Reserved.
*
* Entgra Pvt Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
@ -17,11 +17,12 @@
*/
package io.entgra.task.mgt.core.service;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.task.mgt.common.bean.DynamicTask;
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
import io.entgra.task.mgt.common.exception.TaskNotFoundException;
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
import io.entgra.task.mgt.common.exception.TaskManagementDAOException;
import io.entgra.task.mgt.common.exception.TaskManagementException;
import io.entgra.task.mgt.common.exception.TaskNotFoundException;
import io.entgra.task.mgt.common.exception.TransactionManagementException;
import io.entgra.task.mgt.common.spi.TaskManagementService;
import io.entgra.task.mgt.core.dao.DynamicTaskDAO;
@ -62,13 +63,13 @@ public class TaskManagementServiceImpl implements TaskManagementService {
throw new TaskManagementException(msg);
}
if (!nTaskService.getRegisteredTaskTypes().contains(
TaskMgtConstant.Task.DYNAMIC_TASK_TYPE)) {
TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
try {
nTaskService.registerTaskType(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE);
this.taskManager = nTaskService.getTaskManager(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE);
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
this.taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
} catch (TaskException e) {
String msg = "Error occurred while registering task type ["
+ TaskMgtConstant.Task.DYNAMIC_TASK_TYPE
+ TaskMgtConstants.Task.DYNAMIC_TASK_TYPE
+ "], hence unable to schedule the task.";
log.error(msg);
throw new TaskManagementException(msg, e);
@ -90,6 +91,17 @@ public class TaskManagementServiceImpl implements TaskManagementService {
// add into the ntask core
taskId = TaskManagementUtil.generateTaskId(dynamicTaskId);
try {
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx();
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, taskId);
} catch (HeartBeatManagementException e) {
String msg = "Unexpected exception when getting server hash index.";
log.error(msg, e);
throw new TaskManagementException(msg, e);
}
if (!isTaskExists(taskId)) {
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
triggerInfo.setCronExpression(dynamicTask.getCronExpression());
@ -101,7 +113,7 @@ public class TaskManagementServiceImpl implements TaskManagementService {
}
} else {
String msg = "Task '" + taskId + "' is already exists in the ntask core "
+ "Hence cannot create another task for the same.";
+ "Hence not creating another task for the same name.";
log.error(msg);
}

@ -19,7 +19,7 @@ package io.entgra.task.mgt.core.util;
import com.google.gson.Gson;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
import io.entgra.task.mgt.common.exception.TaskManagementException;
import io.entgra.task.mgt.core.internal.TaskManagerDataHolder;
import org.apache.commons.codec.digest.DigestUtils;
@ -31,7 +31,6 @@ import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
import java.io.IOException;
import java.util.Map;
/**
@ -60,8 +59,8 @@ public class TaskManagementUtil {
try {
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
.getServerCtxInfo().getLocalServerHashIdx();
return TaskMgtConstant.Task.DYNAMIC_TASK_TYPE + TaskMgtConstant.Task.NAME_SEPARATOR + dynamicTaskId
+ TaskMgtConstant.Task.NAME_SEPARATOR + serverHashIdx;
return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId
+ TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx;
} catch (HeartBeatManagementException e) {
String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId;
log.error(msg, e);
@ -70,11 +69,12 @@ public class TaskManagementUtil {
}
public static String generateTaskPropsMD5(Map<String, String> taskProperties) throws TaskManagementException {
if (taskProperties.containsKey(TaskMgtConstant.Task.__TENANT_ID_PROP__)) {
taskProperties.remove(TaskMgtConstant.Task.__TENANT_ID_PROP__);
}
taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP);
taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX);
taskProperties.remove(TaskMgtConstants.Task.LOCAL_TASK_NAME);
Gson gson = new Gson();
String json = gson.toJson(taskProperties);
return DigestUtils.md5Hex(json);
}
}

@ -20,7 +20,7 @@ package io.entgra.task.mgt.watcher;
import io.entgra.task.mgt.common.bean.DynamicTask;
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
import io.entgra.task.mgt.common.exception.TaskManagementException;
import io.entgra.task.mgt.core.util.TaskManagementUtil;
import io.entgra.task.mgt.watcher.internal.TaskWatcherDataHolder;
@ -64,10 +64,10 @@ public class IoTSStartupHandler implements ServerStartupObserver {
}
try {
if (!nTaskService.getRegisteredTaskTypes().contains(
TaskMgtConstant.Task.DYNAMIC_TASK_TYPE)) {
nTaskService.registerTaskType(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE);
TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
}
taskManager = nTaskService.getTaskManager(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE);
taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
List<DynamicTask> dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService()
.getAllDynamicTasks();

Loading…
Cancel
Save