diff --git a/components/application-mgt/org.wso2.carbon.device.application.mgt.core/src/main/java/org/wso2/carbon/device/application/mgt/core/task/ScheduledAppSubscriptionCleanupTask.java b/components/application-mgt/org.wso2.carbon.device.application.mgt.core/src/main/java/org/wso2/carbon/device/application/mgt/core/task/ScheduledAppSubscriptionCleanupTask.java index 38077daf93..113b16cf3f 100644 --- a/components/application-mgt/org.wso2.carbon.device.application.mgt.core/src/main/java/org/wso2/carbon/device/application/mgt/core/task/ScheduledAppSubscriptionCleanupTask.java +++ b/components/application-mgt/org.wso2.carbon.device.application.mgt.core/src/main/java/org/wso2/carbon/device/application/mgt/core/task/ScheduledAppSubscriptionCleanupTask.java @@ -23,13 +23,14 @@ import org.apache.commons.logging.LogFactory; import org.wso2.carbon.device.application.mgt.common.exception.SubscriptionManagementException; import org.wso2.carbon.device.application.mgt.common.services.SubscriptionManager; import org.wso2.carbon.device.application.mgt.core.impl.SubscriptionManagerImpl; -import org.wso2.carbon.ntask.core.Task; +import org.wso2.carbon.device.mgt.core.task.impl.RandomlyAssignedScheduleTask; import java.util.Map; -public class ScheduledAppSubscriptionCleanupTask implements Task { +public class ScheduledAppSubscriptionCleanupTask extends RandomlyAssignedScheduleTask { private static Log log = LogFactory.getLog(ScheduledAppSubscriptionCleanupTask.class); private SubscriptionManager subscriptionManager; + private static final String TASK_NAME = "SCHEDULE_APP_SUBSCRIPTION_CLEANUP"; @Override public void setProperties(Map properties) { @@ -37,18 +38,25 @@ public class ScheduledAppSubscriptionCleanupTask implements Task { } @Override - public void init() { + public void executeRandomlyAssignedTask() { + try { + if(super.isQualifiedToExecuteTask()) { + subscriptionManager.cleanScheduledSubscriptions(); + } + } catch (SubscriptionManagementException e) { + log.error("Error occurred while cleaning up tasks."); + } + } + + @Override + protected void setup() { if (this.subscriptionManager == null) { this.subscriptionManager = new SubscriptionManagerImpl(); } } @Override - public void execute() { - try { - subscriptionManager.cleanScheduledSubscriptions(); - } catch (SubscriptionManagementException e) { - log.error("Error occurred while cleaning up tasks."); - } + public String getTaskName() { + return TASK_NAME; } } diff --git a/components/application-mgt/org.wso2.carbon.device.application.mgt.core/src/main/java/org/wso2/carbon/device/application/mgt/core/task/ScheduledAppSubscriptionTask.java b/components/application-mgt/org.wso2.carbon.device.application.mgt.core/src/main/java/org/wso2/carbon/device/application/mgt/core/task/ScheduledAppSubscriptionTask.java index afbab64d6d..caff4ec460 100644 --- a/components/application-mgt/org.wso2.carbon.device.application.mgt.core/src/main/java/org/wso2/carbon/device/application/mgt/core/task/ScheduledAppSubscriptionTask.java +++ b/components/application-mgt/org.wso2.carbon.device.application.mgt.core/src/main/java/org/wso2/carbon/device/application/mgt/core/task/ScheduledAppSubscriptionTask.java @@ -33,15 +33,17 @@ import org.wso2.carbon.device.application.mgt.common.services.SubscriptionManage import org.wso2.carbon.device.application.mgt.core.impl.SubscriptionManagerImpl; import org.wso2.carbon.device.application.mgt.core.util.Constants; import org.wso2.carbon.device.mgt.common.DeviceIdentifier; -import org.wso2.carbon.ntask.core.Task; +import org.wso2.carbon.device.mgt.core.task.impl.RandomlyAssignedScheduleTask; import java.util.List; import java.util.Map; import java.util.regex.Pattern; import java.util.stream.Collectors; -public class ScheduledAppSubscriptionTask implements Task { +public class ScheduledAppSubscriptionTask extends RandomlyAssignedScheduleTask { private static Log log = LogFactory.getLog(ScheduledAppSubscriptionTask.class); + private static final String TASK_NAME = "SCHEDULE_APP_SUBSCRIPTION"; + private SubscriptionManager subscriptionManager; private String subscribers; private String subscriptionType; @@ -65,68 +67,75 @@ public class ScheduledAppSubscriptionTask implements Task { } @Override - public void init() { - if (this.subscriptionManager == null) { - this.subscriptionManager = new SubscriptionManagerImpl(); - } - } - - @Override - public void execute() { - try { - ScheduledSubscriptionDTO subscriptionDTO = subscriptionManager.getPendingScheduledSubscription( - this.taskName); - if (subscriptionDTO == null) { - log.error("Unable to execute the task. Task entry for [" + this.taskName + "] cannot be retrieved " + - "from the database."); - return; - } - if (StringUtils.isNotEmpty(this.subscribers)) { - PrivilegedCarbonContext.startTenantFlow(); - PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext(); - carbonContext.setTenantDomain(this.tenantDomain); - carbonContext.setTenantId(this.tenantId); - carbonContext.setUsername(this.subscriber); + public void executeRandomlyAssignedTask() { + if(super.isQualifiedToExecuteTask()) { + try { + ScheduledSubscriptionDTO subscriptionDTO = subscriptionManager.getPendingScheduledSubscription( + this.taskName); + if (subscriptionDTO == null) { + log.error("Unable to execute the task. Task entry for [" + this.taskName + "] cannot be retrieved " + + "from the database."); + return; + } + if (StringUtils.isNotEmpty(this.subscribers)) { + PrivilegedCarbonContext.startTenantFlow(); + PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext(); + carbonContext.setTenantDomain(this.tenantDomain); + carbonContext.setTenantId(this.tenantId); + carbonContext.setUsername(this.subscriber); - if (this.subscriptionType.equals(SubscriptionType.DEVICE.toString())) { - List deviceIdentifiers = new Gson().fromJson(this.subscribers, - new TypeToken>() { - }.getType()); - try { - subscriptionManager.performBulkAppOperation(this.application, deviceIdentifiers, - this.subscriptionType, this.action); - subscriptionDTO.setStatus(ExecutionStatus.EXECUTED); - } catch (ApplicationManagementException e) { - log.error( - "Error occurred while " + this.action + "ing application " + this.application - + "to/from the following devices: " + this.subscribers, e); - subscriptionDTO.setStatus(ExecutionStatus.FAILED); + if (this.subscriptionType.equals(SubscriptionType.DEVICE.toString())) { + List deviceIdentifiers = new Gson().fromJson(this.subscribers, + new TypeToken>() { + }.getType()); + try { + subscriptionManager.performBulkAppOperation(this.application, deviceIdentifiers, + this.subscriptionType, this.action); + subscriptionDTO.setStatus(ExecutionStatus.EXECUTED); + } catch (ApplicationManagementException e) { + log.error( + "Error occurred while " + this.action + "ing application " + this.application + + "to/from the following devices: " + this.subscribers, e); + subscriptionDTO.setStatus(ExecutionStatus.FAILED); + } + } else { + List subscriberList = Pattern.compile(",").splitAsStream(this.subscribers).collect( + Collectors.toList()); + try { + subscriptionManager.performBulkAppOperation(this.application, subscriberList, + this.subscriptionType, this.action); + subscriptionDTO.setStatus(ExecutionStatus.EXECUTED); + } catch (ApplicationManagementException e) { + log.error( + "Error occurred while " + this.action + "ing application " + this.application + + "to/from the following " + this.subscriptionType + "s: " + this.subscribers, e); + subscriptionDTO.setStatus(ExecutionStatus.FAILED); + } } } else { - List subscriberList = Pattern.compile(",").splitAsStream(this.subscribers).collect( - Collectors.toList()); - try { - subscriptionManager.performBulkAppOperation(this.application, subscriberList, - this.subscriptionType, this.action); - subscriptionDTO.setStatus(ExecutionStatus.EXECUTED); - } catch (ApplicationManagementException e) { - log.error( - "Error occurred while " + this.action + "ing application " + this.application - + "to/from the following " + this.subscriptionType + "s: " + this.subscribers, e); - subscriptionDTO.setStatus(ExecutionStatus.FAILED); - } + log.warn( + "Subscriber list is empty. Therefore skipping scheduled task to " + this.action + "application " + + this.application); + subscriptionDTO.setStatus(ExecutionStatus.FAILED); } - } else { - log.warn( - "Subscriber list is empty. Therefore skipping scheduled task to " + this.action + "application " - + this.application); - subscriptionDTO.setStatus(ExecutionStatus.FAILED); + subscriptionManager.updateScheduledSubscriptionStatus(subscriptionDTO.getId(), subscriptionDTO.getStatus()); + } catch (SubscriptionManagementException e) { + log.error("Error occurred while executing the task: " + this.taskName, e); + } finally { + PrivilegedCarbonContext.endTenantFlow(); } - subscriptionManager.updateScheduledSubscriptionStatus(subscriptionDTO.getId(), subscriptionDTO.getStatus()); - } catch (SubscriptionManagementException e) { - log.error("Error occurred while executing the task: " + this.taskName, e); - } finally { - PrivilegedCarbonContext.endTenantFlow(); } } + + @Override + protected void setup() { + if (this.subscriptionManager == null) { + this.subscriptionManager = new SubscriptionManagerImpl(); + } + } + + @Override + public String getTaskName() { + return TASK_NAME; + } } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java index 9aa430ee61..ce2ffddfec 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/status/task/impl/DeviceStatusMonitoringTask.java @@ -68,14 +68,13 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask { } @Override - public void execute() { + public void executeDynamicTask() { List operationEnrolmentMappings; List enrolmentInfoTobeUpdated = new ArrayList<>(); Map lastActivities = null; EnrolmentInfo enrolmentInfo; DeviceIdentifier deviceIdentifier; Device device; - super.refreshContext(); try { operationEnrolmentMappings = this.getOperationEnrolmentMappings(super.getTaskContext()); if (operationEnrolmentMappings.size() > 0) { diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java index 2c4425ff22..360325e9ab 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DeviceDetailsRetrieverTask.java @@ -65,8 +65,7 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask { } @Override - public void execute() { - super.refreshContext(); + public void executeDynamicTask() { deviceManagementProviderService = DeviceManagementDataHolder.getInstance() .getDeviceManagementProvider(); OperationMonitoringTaskConfig operationMonitoringTaskConfig = deviceManagementProviderService diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java index d124874dbe..2013105b6e 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/DynamicPartitionedScheduleTask.java @@ -36,22 +36,12 @@ public abstract class DynamicPartitionedScheduleTask implements Task { @Override public final void init() { try { - ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); - if(ctxInfo!=null){ + boolean dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled(); + if(dynamicTaskEnabled){ taskContext = new DynamicTaskContext(); - updateContext(ctxInfo); - - if(ctxInfo.getActiveServerCount() > 0){ - taskContext.setPartitioningEnabled(true); - } - - if(log.isDebugEnabled()){ - log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() + - " where active server count is : " + taskContext.getActiveServerCount() + - " partitioning task enabled : " + taskContext.isPartitioningEnabled()); - } + taskContext.setPartitioningEnabled(true); } else { - log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function."); + log.info("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function."); } } catch (HeartBeatManagementException e) { log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function." , e); @@ -59,29 +49,56 @@ public abstract class DynamicPartitionedScheduleTask implements Task { setup(); } - public DynamicTaskContext refreshContext(){ - try { - ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); - if(ctxInfo != null) { - updateContext(ctxInfo); - } else { - log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode."); + @Override + public final void execute() { + refreshContext(); + executeDynamicTask(); + } + + public void refreshContext(){ + if(taskContext != null && taskContext.isPartitioningEnabled()) { + try { + updateContext(); + } catch (HeartBeatManagementException e) { + log.error("Error refreshing Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.", e); } - } catch (HeartBeatManagementException e) { - log.error("Error refreshing Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.", e); } - return taskContext; } - private void updateContext(ServerCtxInfo ctxInfo) { + private void updateContext() throws HeartBeatManagementException { + ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo(); + if(ctxInfo != null) { + populateContext(ctxInfo); + } else { + log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode."); + } + } + + private void populateContext(ServerCtxInfo ctxInfo) { taskContext.setActiveServerCount(ctxInfo.getActiveServerCount()); taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx()); + + if(log.isDebugEnabled()){ + log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() + + " where active server count is : " + taskContext.getActiveServerCount() + + " partitioning task enabled : " + taskContext.isPartitioningEnabled()); + } } protected abstract void setup(); + protected abstract void executeDynamicTask(); + public static DynamicTaskContext getTaskContext() { return taskContext; } + public static boolean isDynamicTaskEligible(){ + if(taskContext != null && taskContext.isPartitioningEnabled()) { + return true; + } else { + return false; + } + } + } diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/RandomlyAssignedScheduleTask.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/RandomlyAssignedScheduleTask.java new file mode 100644 index 0000000000..56a1c498a0 --- /dev/null +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/main/java/org/wso2/carbon/device/mgt/core/task/impl/RandomlyAssignedScheduleTask.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2020, Entgra (pvt) Ltd. (http://entgra.io) All Rights Reserved. + * + * Entgra (pvt) Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.carbon.device.mgt.core.task.impl; + +import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder; +import org.wso2.carbon.ntask.core.Task; + + +public abstract class RandomlyAssignedScheduleTask implements Task { + + private static final Log log = LogFactory.getLog(RandomlyAssignedScheduleTask.class); + + private static String taskName = "UNSPECIFIED"; + private static boolean qualifiedToExecuteTask = false; + private static boolean dynamicTaskEnabled = false; + + @Override + public final void init() { + try { + dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled(); + } catch (HeartBeatManagementException e) { + log.error("Error Instantiating Variables necessary for Randomly Assigned Task Scheduling." , e); + } + //This is done so that sub class extending this abstract class is forced to specify a task name. + taskName = getTaskName(); + setup(); + } + + @Override + public final void execute() { + refreshContext(); + executeRandomlyAssignedTask(); + } + + public void refreshContext(){ + if(dynamicTaskEnabled) { + try { + qualifiedToExecuteTask = DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask(); + log.info("## NODE Qualified to execute Randomly Assigned Task : " + taskName); + DeviceManagementDataHolder.getInstance().getHeartBeatService().updateTaskExecutionAcknowledgement(taskName); + } catch (HeartBeatManagementException e) { + log.error("Error refreshing Variables necessary for Randomly Assigned Scheduled Task. " + + "Dynamic Tasks will not function.", e); + } + } + } + + protected abstract void setup(); + + protected abstract void executeRandomlyAssignedTask(); + + public static boolean isQualifiedToExecuteTask() { + return qualifiedToExecuteTask; + } + + public abstract String getTaskName(); +} diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/TestHeartBeatManagementService.java b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/TestHeartBeatManagementService.java index 1d1efa10ec..54dcc96504 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/TestHeartBeatManagementService.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.core/src/test/java/org/wso2/carbon/device/mgt/core/TestHeartBeatManagementService.java @@ -7,6 +7,11 @@ import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementServi import org.wso2.carbon.device.mgt.common.ServerCtxInfo; public class TestHeartBeatManagementService implements HeartBeatManagementService { + @Override + public boolean isTaskPartitioningEnabled() throws HeartBeatManagementException { + return false; + } + @Override public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException { return null; @@ -21,4 +26,20 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException { return false; } + + @Override + public void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException { + + } + + @Override + public boolean updateTaskExecutionAcknowledgement(String newTask) + throws HeartBeatManagementException { + return false; + } + + @Override + public boolean isQualifiedToExecuteTask() throws HeartBeatManagementException { + return false; + } } diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/HeartBeatDAO.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/HeartBeatDAO.java index 4948f95792..9013e443f5 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/HeartBeatDAO.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/HeartBeatDAO.java @@ -19,9 +19,11 @@ package io.entgra.server.bootup.heartbeat.beacon.dao; import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOException; +import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate; import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; +import java.util.List; import java.util.Map; /** @@ -39,4 +41,12 @@ public interface HeartBeatDAO { Map getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException; + boolean recordElectedCandidate(String serverUUID) throws HeartBeatDAOException; + + void purgeCandidates() throws HeartBeatDAOException; + + ElectedCandidate retrieveCandidate() throws HeartBeatDAOException; + + boolean acknowledgeTask(String uuid, List taskList) throws HeartBeatDAOException; + } diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/impl/GenericHeartBeatDAOImpl.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/impl/GenericHeartBeatDAOImpl.java index c6aaf04872..ae8bbd247f 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/impl/GenericHeartBeatDAOImpl.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/impl/GenericHeartBeatDAOImpl.java @@ -22,8 +22,11 @@ import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory; import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatDAO; import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOException; import io.entgra.server.bootup.heartbeat.beacon.dao.util.HeartBeatBeaconDAOUtil; +import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate; import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.sql.Connection; import java.sql.PreparedStatement; @@ -32,6 +35,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -41,6 +45,8 @@ import java.util.concurrent.TimeUnit; */ public class GenericHeartBeatDAOImpl implements HeartBeatDAO { + private static final Log log = LogFactory.getLog(GenericHeartBeatDAOImpl.class); + @Override public String recordServerCtx(ServerContext ctx) throws HeartBeatDAOException { PreparedStatement stmt = null; @@ -69,6 +75,83 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO { return uuid; } + @Override + public boolean recordElectedCandidate(String serverUUID) throws HeartBeatDAOException { + PreparedStatement stmt = null; + try { + Connection conn = HeartBeatBeaconDAOFactory.getConnection(); + String sql; + sql = "INSERT INTO ELECTED_LEADER_META_INFO(UUID) VALUES (?)"; + stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); + stmt.setString(1, serverUUID); + + return stmt.executeUpdate() > 0; + } catch (SQLException e) { + throw new HeartBeatDAOException("Error occurred while persisting UUID of chosen " + + "elected dynamic task execution candidate : " + serverUUID , e); + } finally { + HeartBeatBeaconDAOUtil.cleanupResources(stmt, null); + } + } + + @Override + public void purgeCandidates() throws HeartBeatDAOException { + Statement stmt = null; + try { + Connection conn = HeartBeatBeaconDAOFactory.getConnection(); + conn.setAutoCommit(false); + String sql = "TRUNCATE TABLE ELECTED_LEADER_META_INFO"; + stmt = conn.createStatement(); + stmt.execute(sql); + conn.commit(); + } catch (SQLException e) { + throw new HeartBeatDAOException("Error occurred while truncating ELECTED_LEADER_META_INFO table.", e); + } finally { + HeartBeatBeaconDAOUtil.cleanupResources(stmt, null); + } + } + + @Override + public ElectedCandidate retrieveCandidate() throws HeartBeatDAOException { + Statement stmt = null; + ResultSet resultSet = null; + ElectedCandidate candidate = null; + try { + Connection conn = HeartBeatBeaconDAOFactory.getConnection(); + String sql = "SELECT * from ELECTED_LEADER_META_INFO"; + stmt = conn.createStatement(); + resultSet = stmt.executeQuery(sql); + while (resultSet.next()){ + candidate = HeartBeatBeaconDAOUtil.populateCandidate(resultSet); + } + } catch (SQLException e) { + throw new HeartBeatDAOException("Error occurred while retrieving meta information of elected candidate", e); + } finally { + HeartBeatBeaconDAOUtil.cleanupResources(stmt, resultSet); + } + return candidate; + } + + @Override + public boolean acknowledgeTask(String uuid, List taskList) throws HeartBeatDAOException { + PreparedStatement stmt = null; + try { + Connection conn = HeartBeatBeaconDAOFactory.getConnection(); + String sql; + sql = "UPDATE ELECTED_LEADER_META_INFO SET ACKNOWLEDGED_TASK_LIST = ? WHERE UUID = ?"; + stmt = conn.prepareStatement(sql, new String[]{"UUID"}); + stmt.setString(1, String.join(",", taskList)); + stmt.setString(2, uuid); + + return stmt.executeUpdate() > 0; + } catch (SQLException e) { + throw new HeartBeatDAOException("Error occurred while updating task list of elected server : '" + + uuid + "' and task list " + taskList, e); + } finally { + HeartBeatBeaconDAOUtil.cleanupResources(stmt, null); + } + } + @Override public boolean recordHeatBeat(HeartBeatEvent event) throws HeartBeatDAOException { PreparedStatement stmt = null; @@ -152,8 +235,7 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO { "WHERE LAST_UPDATED_TIMESTAMP > ? " + "ORDER BY UUID"; stmt = conn.prepareStatement(sql); - stmt.setInt(1, elapsedTimeInSeconds); - stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds))); + stmt.setTimestamp(1, new Timestamp(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds))); resultSet = stmt.executeQuery(); while (resultSet.next()) { ctxList.put(resultSet.getString("UUID"), HeartBeatBeaconDAOUtil.populateContext(resultSet)); diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/util/HeartBeatBeaconDAOUtil.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/util/HeartBeatBeaconDAOUtil.java index 211a10635b..c131c3d43b 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/util/HeartBeatBeaconDAOUtil.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dao/util/HeartBeatBeaconDAOUtil.java @@ -18,6 +18,7 @@ package io.entgra.server.bootup.heartbeat.beacon.dao.util; +import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,6 +28,8 @@ import javax.sql.DataSource; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; import java.util.Hashtable; /** @@ -59,6 +62,29 @@ public final class HeartBeatBeaconDAOUtil { } } + /** + * Cleanup resources used to transaction + * + * @param stmt Statement used + * @param rs Obtained results set + */ + public static void cleanupResources(Statement stmt, ResultSet rs) { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + log.warn("Error occurred while closing result set", e); + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + log.warn("Error occurred while closing prepared statement", e); + } + } + } + /** * Lookup datasource using name and jndi properties * @@ -88,4 +114,15 @@ public final class HeartBeatBeaconDAOUtil { ctx.setCarbonServerPort(resultSet.getInt("SERVER_PORT")); return ctx; } + + public static ElectedCandidate populateCandidate(ResultSet resultSet) throws SQLException { + ElectedCandidate candidate = new ElectedCandidate(); + candidate.setServerUUID(resultSet.getString("UUID")); + candidate.setTimeOfElection(resultSet.getTimestamp("ELECTED_TIME")); + String tasksList = resultSet.getString("ACKNOWLEDGED_TASK_LIST"); + if(tasksList != null && !tasksList.isEmpty()){ + candidate.setAcknowledgedTaskList(Arrays.asList(tasksList.split(","))); + } + return candidate; + } } diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dto/ElectedCandidate.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dto/ElectedCandidate.java new file mode 100644 index 0000000000..a3920d362b --- /dev/null +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/dto/ElectedCandidate.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2020, Entgra Pvt Ltd. (http://www.wso2.org) All Rights Reserved. + * + * Entgra Pvt Ltd. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.entgra.server.bootup.heartbeat.beacon.dto; + +import java.sql.Timestamp; +import java.util.List; + +public class ElectedCandidate { + + private String serverUUID; + private Timestamp timeOfElection; + private List acknowledgedTaskList = null; + + public List getAcknowledgedTaskList() { + return acknowledgedTaskList; + } + + public void setAcknowledgedTaskList(List acknowledgedTaskList) { + this.acknowledgedTaskList = acknowledgedTaskList; + } + + public String getServerUUID() { + return serverUUID; + } + + public void setServerUUID(String serverUUID) { + this.serverUUID = serverUUID; + } + + public Timestamp getTimeOfElection() { + return timeOfElection; + } + + public void setTimeOfElection(Timestamp timeOfElection) { + this.timeOfElection = timeOfElection; + } + +} diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java index 5d83a0acd3..dc111aef73 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatBeaconComponent.java @@ -47,7 +47,7 @@ public class HeartBeatBeaconComponent { protected void activate(ComponentContext componentContext) { try { if (log.isDebugEnabled()) { - log.debug("Initializing email sender core bundle"); + log.debug("Initializing heart beat management bundle"); } this.registerHeartBeatServices(componentContext); @@ -88,7 +88,7 @@ public class HeartBeatBeaconComponent { /* This is to avoid mobile device management component getting initialized before the underlying datasources are registered */ if (log.isDebugEnabled()) { - log.debug("Data source service set to mobile service component"); + log.debug("Data source service set to heart beat management component"); } } diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java index f215288852..1187513daa 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/internal/HeartBeatExecutor.java @@ -57,13 +57,17 @@ public class HeartBeatExecutor { uuid = HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().updateServerContext(ctx); HeartBeatBeaconUtils.saveUUID(uuid); } + int timeOutIntervalInSeconds = CONFIG.getServerTimeOutIntervalInSeconds(); + int timeSkew = CONFIG.getTimeSkew(); + int cumilativeTimeOut = timeOutIntervalInSeconds + timeSkew; final String designatedUUID = uuid; HeartBeatBeaconDataHolder.getInstance().setLocalServerUUID(designatedUUID); Runnable periodicTask = new Runnable() { public void run() { try { recordHeartBeat(designatedUUID); - } catch (HeartBeatManagementException e) { + electDynamicTaskExecutionCandidate(cumilativeTimeOut); + } catch (Exception e) { log.error("Error while executing record heart beat task. This will result in schedule operation malfunction.", e); } } @@ -83,4 +87,9 @@ public class HeartBeatExecutor { HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().recordHeartBeat(new HeartBeatEvent(uuid)); } + static void electDynamicTaskExecutionCandidate(int cumilativeTimeOut) throws HeartBeatManagementException { + HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().electCandidate(cumilativeTimeOut); + } + + } diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java index 90644cdd05..7038f6ee4c 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementService.java @@ -18,6 +18,7 @@ package io.entgra.server.bootup.heartbeat.beacon.service; +import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate; import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; @@ -25,10 +26,18 @@ import org.wso2.carbon.device.mgt.common.ServerCtxInfo; public interface HeartBeatManagementService { + boolean isTaskPartitioningEnabled() throws HeartBeatManagementException; + ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException; String updateServerContext(ServerContext ctx) throws HeartBeatManagementException; boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException; + void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException; + + boolean updateTaskExecutionAcknowledgement(String newTask) throws HeartBeatManagementException; + + boolean isQualifiedToExecuteTask() throws HeartBeatManagementException; + } diff --git a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java index adc245e5ff..cd70914d46 100644 --- a/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java +++ b/components/heartbeat-management/io.entgra.server.bootup.heartbeat.beacon/src/main/java/io/entgra/server/bootup/heartbeat/beacon/service/HeartBeatManagementServiceImpl.java @@ -22,18 +22,29 @@ import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig; import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory; import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatDAO; import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOException; +import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate; import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent; import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext; import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException; import io.entgra.server.bootup.heartbeat.beacon.internal.HeartBeatBeaconDataHolder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.wso2.carbon.device.mgt.common.ServerCtxInfo; import org.wso2.carbon.device.mgt.common.exceptions.TransactionManagementException; import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; public class HeartBeatManagementServiceImpl implements HeartBeatManagementService { + private static final Log log = LogFactory.getLog(HeartBeatManagementServiceImpl.class); + private final HeartBeatDAO heartBeatDAO; public HeartBeatManagementServiceImpl(){ @@ -77,6 +88,19 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic return serverCtxInfo; } + @Override + public boolean isTaskPartitioningEnabled() throws HeartBeatManagementException { + boolean enabled = false; + if(HeartBeatBeaconConfig.getInstance() != null){ + enabled = HeartBeatBeaconConfig.getInstance().isEnabled(); + } else { + String msg = "Issue instantiating heart beat config."; + throw new HeartBeatManagementException(msg); + } + return enabled; + } + + @Override public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException { String uuid = null; @@ -105,6 +129,124 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic return uuid; } + @Override + public boolean isQualifiedToExecuteTask() throws HeartBeatManagementException { + boolean isQualified = false; + if(HeartBeatBeaconConfig.getInstance().isEnabled()) { + try { + String localServerUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID(); + HeartBeatBeaconDAOFactory.openConnection(); + ElectedCandidate candidate = heartBeatDAO.retrieveCandidate(); + if(candidate != null && candidate.getServerUUID().equalsIgnoreCase(localServerUUID)){ + isQualified = true; + if(log.isDebugEnabled()){ + log.debug("Node : " + localServerUUID + " Qualified to execute randomly assigned task."); + } + } + } catch (HeartBeatDAOException e) { + String msg = "Error occurred while checking if server is qualified to execute randomly designated task."; + throw new HeartBeatManagementException(msg, e); + } catch (SQLException e) { + String msg = "Error occurred while opening a connection to the underlying data source"; + throw new HeartBeatManagementException(msg, e); + } finally { + HeartBeatBeaconDAOFactory.closeConnection(); + } + } else { + String msg = "Heart Beat Configuration Disabled. Error occurred while checking if server is qualified to execute randomly designated task."; + throw new HeartBeatManagementException(msg); + } + return isQualified; + } + + @Override + public boolean updateTaskExecutionAcknowledgement(String newTask) throws HeartBeatManagementException { + boolean result = false; + if(HeartBeatBeaconConfig.getInstance().isEnabled()) { + try { + String serverUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID(); + HeartBeatBeaconDAOFactory.beginTransaction(); + ElectedCandidate candidate = heartBeatDAO.retrieveCandidate(); + if(candidate != null && candidate.getServerUUID().equals(serverUUID)){ + List taskList = candidate.getAcknowledgedTaskList(); + boolean taskExecuted = false; + for(String task : taskList){ + if(task.equalsIgnoreCase(newTask)){ + taskExecuted = true; + break; + } + } + if(!taskExecuted) { + taskList.add(newTask); + result = heartBeatDAO.acknowledgeTask(serverUUID, taskList); + HeartBeatBeaconDAOFactory.commitTransaction(); + } + } + } catch (HeartBeatDAOException e) { + String msg = "Error occurred while updating acknowledged task."; + throw new HeartBeatManagementException(msg, e); + } catch (TransactionManagementException e) { + HeartBeatBeaconDAOFactory.rollbackTransaction(); + String msg = "Error occurred while updating acknowledged task.. Issue in opening a connection to the underlying data source"; + throw new HeartBeatManagementException(msg, e); + } finally { + HeartBeatBeaconDAOFactory.closeConnection(); + } + } else { + String msg = "Heart Beat Configuration Disabled. Updating acknowledged task list failed."; + throw new HeartBeatManagementException(msg); + } + return result; + } + + + @Override + public void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException { + if (HeartBeatBeaconConfig.getInstance().isEnabled()) { + try { + HeartBeatBeaconDAOFactory.beginTransaction(); + Map servers = heartBeatDAO.getActiveServerDetails(elapsedTimeInSeconds); + if (servers != null && !servers.isEmpty()) { + ElectedCandidate presentCandidate = heartBeatDAO.retrieveCandidate(); + if (presentCandidate != null && + presentCandidate.getTimeOfElection().before(new Timestamp(System.currentTimeMillis() + - TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)))) { + heartBeatDAO.purgeCandidates(); + electCandidate(servers); + } else { + electCandidate(servers); + } + HeartBeatBeaconDAOFactory.commitTransaction(); + } + } catch (HeartBeatDAOException e) { + String msg = "Error occurred while electing candidate for dynamic task execution."; + throw new HeartBeatManagementException(msg, e); + } catch (TransactionManagementException e) { + HeartBeatBeaconDAOFactory.rollbackTransaction(); + String msg = "Error occurred while electing candidate for dynamic task execution. Issue in opening a connection to the underlying data source"; + throw new HeartBeatManagementException(msg, e); + } finally { + HeartBeatBeaconDAOFactory.closeConnection(); + } + } else { + String msg = "Heart Beat Configuration Disabled. Error electing candidate for dynamic task execution."; + throw new HeartBeatManagementException(msg); + } + } + + private void electCandidate(Map servers) throws HeartBeatDAOException { + String electedCandidate = getRandomElement(servers.keySet()); + heartBeatDAO.recordElectedCandidate(electedCandidate); + } + + + private String getRandomElement(Set valueSet) + { + Random rand = new Random(); + List items = new ArrayList<>(valueSet); + return items.get(rand.nextInt(items.size())); + } + @Override public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException { diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/enforcement/DelegationTask.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/enforcement/DelegationTask.java index bb070f792c..7ffd6df4e2 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/enforcement/DelegationTask.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/enforcement/DelegationTask.java @@ -49,8 +49,7 @@ public class DelegationTask extends DynamicPartitionedScheduleTask { } @Override - public void execute() { - super.refreshContext(); + public void executeDynamicTask() { try { PolicyManager policyManager = new PolicyManagerImpl(); UpdatedPolicyDeviceListBean updatedPolicyDeviceList = policyManager.applyChangesMadeToPolicies(); @@ -70,7 +69,7 @@ public class DelegationTask extends DynamicPartitionedScheduleTask { try { devices = new ArrayList<>(); toBeNotified = new ArrayList<>(); - if(super.getTaskContext() != null && super.getTaskContext().isPartitioningEnabled()) { + if(super.isDynamicTaskEligible()) { devices.addAll(service.getAllocatedDevices(deviceType, super.getTaskContext().getActiveServerCount(), super.getTaskContext().getServerHashIndex())); diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java index 574f93f6e6..e539ad2c61 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/main/java/org/wso2/carbon/policy/mgt/core/task/MonitoringTask.java @@ -45,12 +45,10 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask { } @Override - public void execute() { - + public void executeDynamicTask() { if (log.isDebugEnabled()) { log.debug("Monitoring task started to run."); } - this.executeforAllTenants(); } @@ -96,7 +94,6 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask { } private void executeTask() { - super.refreshContext(); MonitoringManager monitoringManager = PolicyManagementDataHolder.getInstance().getMonitoringManager(); List deviceTypes = new ArrayList<>(); List configDeviceTypes = new ArrayList<>(); @@ -122,7 +119,7 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask { PolicyManagementDataHolder.getInstance().getDeviceManagementService() .getPolicyMonitoringManager(deviceType); List devices; - if(super.getTaskContext()!= null && super.getTaskContext().isPartitioningEnabled()){ + if(super.isDynamicTaskEligible()){ devices = deviceManagementProviderService.getAllocatedDevices(deviceType, super.getTaskContext().getActiveServerCount(), super.getTaskContext().getServerHashIndex()); diff --git a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/mock/TestHeartBeatManagementService.java b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/mock/TestHeartBeatManagementService.java index 8b9f711aa0..aa8b517717 100644 --- a/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/mock/TestHeartBeatManagementService.java +++ b/components/policy-mgt/org.wso2.carbon.policy.mgt.core/src/test/java/org/wso2/carbon/policy/mgt/core/mock/TestHeartBeatManagementService.java @@ -7,6 +7,11 @@ import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementServi import org.wso2.carbon.device.mgt.common.ServerCtxInfo; public class TestHeartBeatManagementService implements HeartBeatManagementService { + @Override + public boolean isTaskPartitioningEnabled() throws HeartBeatManagementException { + return false; + } + @Override public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException { return null; @@ -21,4 +26,20 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException { return false; } + + @Override + public void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException { + + } + + @Override + public boolean updateTaskExecutionAcknowledgement(String newTask) + throws HeartBeatManagementException { + return false; + } + + @Override + public boolean isQualifiedToExecuteTask() throws HeartBeatManagementException { + return false; + } } diff --git a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/conf/heart-beat-config.xml b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/conf/heart-beat-config.xml index 8b3aa6a82a..814f2f1163 100644 --- a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/conf/heart-beat-config.xml +++ b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/conf/heart-beat-config.xml @@ -19,7 +19,7 @@ --> - true + false jdbc/HeartBeat_DS diff --git a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/h2.sql b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/h2.sql index 3f5ad9652e..2a68137287 100644 --- a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/h2.sql +++ b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/h2.sql @@ -9,3 +9,10 @@ CREATE TABLE IF NOT EXISTS SERVER_HEART_BEAT_EVENTS ( LAST_UPDATED_TIMESTAMP TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (ID) ); + +CREATE TABLE IF NOT EXISTS ELECTED_LEADER_META_INFO ( + UUID VARCHAR(100) NOT NULL, + ELECTED_TIME TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + ACKNOWLEDGED_TASK_LIST TEXT DEFAULT NULL, + PRIMARY KEY (UUID) +); diff --git a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/mssql.sql b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/mssql.sql index a56b9f981b..05c4713620 100644 --- a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/mssql.sql +++ b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/mssql.sql @@ -11,4 +11,9 @@ CREATE TABLE SERVER_HEART_BEAT_EVENTS ( LAST_UPDATED_TIMESTAMP DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (ID)); - +IF NOT EXISTS (SELECT * FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[ELECTED_LEADER_META_INFO]') AND TYPE IN (N'U')) +CREATE TABLE ELECTED_LEADER_META_INFO ( + UUID VARCHAR(100) NOT NULL, + ELECTED_TIME DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + ACKNOWLEDGED_TASK_LIST VARCHAR(MAX) DEFAULT NULL, + PRIMARY KEY (UUID)); diff --git a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/mysql.sql b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/mysql.sql index 3f5ad9652e..2a68137287 100644 --- a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/mysql.sql +++ b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/mysql.sql @@ -9,3 +9,10 @@ CREATE TABLE IF NOT EXISTS SERVER_HEART_BEAT_EVENTS ( LAST_UPDATED_TIMESTAMP TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (ID) ); + +CREATE TABLE IF NOT EXISTS ELECTED_LEADER_META_INFO ( + UUID VARCHAR(100) NOT NULL, + ELECTED_TIME TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + ACKNOWLEDGED_TASK_LIST TEXT DEFAULT NULL, + PRIMARY KEY (UUID) +); diff --git a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/oracle.sql b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/oracle.sql index f9ca7a4367..7d4649187f 100644 --- a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/oracle.sql +++ b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/oracle.sql @@ -11,3 +11,11 @@ CREATE TABLE SERVER_HEART_BEAT_EVENTS ( CONSTRAINT PK_SERVER_HEART_BEAT_EVENTS PRIMARY KEY (ID) ) / + +CREATE TABLE ELECTED_LEADER_META_INFO ( + UUID VARCHAR(100) NOT NULL, + ELECTED_TIME TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + ACKNOWLEDGED_TASK_LIST BLOB DEFAULT NULL, + CONSTRAINT PK_SERVER_HEART_BEAT_EVENTS PRIMARY KEY (UUID) +) +/ diff --git a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/postgresql.sql b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/postgresql.sql index 85081a385b..9b895c6f31 100644 --- a/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/postgresql.sql +++ b/features/heartbeat-management/io.entgra.server.heart.beat.feature/src/main/resources/dbscripts/heart-beat/postgresql.sql @@ -9,3 +9,10 @@ LAST_UPDATED_TIMESTAMP TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (ID) ); + +CREATE TABLE IF NOT EXISTS ELECTED_LEADER_META_INFO ( + UUID VARCHAR(100) NOT NULL, + ELECTED_TIME TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + ACKNOWLEDGED_TASK_LIST TEXT DEFAULT NULL, + PRIMARY KEY (UUID) +);