adding randomly assigned task functionality and other improvements

revert-70ac1926
Ace 4 years ago
parent 9d39197844
commit c6db9eeecc

@ -23,13 +23,14 @@ import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.application.mgt.common.exception.SubscriptionManagementException;
import org.wso2.carbon.device.application.mgt.common.services.SubscriptionManager;
import org.wso2.carbon.device.application.mgt.core.impl.SubscriptionManagerImpl;
import org.wso2.carbon.ntask.core.Task;
import org.wso2.carbon.device.mgt.core.task.impl.RandomlyAssignedScheduleTask;
import java.util.Map;
public class ScheduledAppSubscriptionCleanupTask implements Task {
public class ScheduledAppSubscriptionCleanupTask extends RandomlyAssignedScheduleTask {
private static Log log = LogFactory.getLog(ScheduledAppSubscriptionCleanupTask.class);
private SubscriptionManager subscriptionManager;
private static final String TASK_NAME = "SCHEDULE_APP_SUBSCRIPTION_CLEANUP";
@Override
public void setProperties(Map<String, String> properties) {
@ -37,18 +38,25 @@ public class ScheduledAppSubscriptionCleanupTask implements Task {
}
@Override
public void init() {
public void executeRandomlyAssignedTask() {
try {
if(super.isQualifiedToExecuteTask()) {
subscriptionManager.cleanScheduledSubscriptions();
}
} catch (SubscriptionManagementException e) {
log.error("Error occurred while cleaning up tasks.");
}
}
@Override
protected void setup() {
if (this.subscriptionManager == null) {
this.subscriptionManager = new SubscriptionManagerImpl();
}
}
@Override
public void execute() {
try {
subscriptionManager.cleanScheduledSubscriptions();
} catch (SubscriptionManagementException e) {
log.error("Error occurred while cleaning up tasks.");
}
public String getTaskName() {
return TASK_NAME;
}
}

@ -33,15 +33,17 @@ import org.wso2.carbon.device.application.mgt.common.services.SubscriptionManage
import org.wso2.carbon.device.application.mgt.core.impl.SubscriptionManagerImpl;
import org.wso2.carbon.device.application.mgt.core.util.Constants;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.ntask.core.Task;
import org.wso2.carbon.device.mgt.core.task.impl.RandomlyAssignedScheduleTask;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class ScheduledAppSubscriptionTask implements Task {
public class ScheduledAppSubscriptionTask extends RandomlyAssignedScheduleTask {
private static Log log = LogFactory.getLog(ScheduledAppSubscriptionTask.class);
private static final String TASK_NAME = "SCHEDULE_APP_SUBSCRIPTION";
private SubscriptionManager subscriptionManager;
private String subscribers;
private String subscriptionType;
@ -65,68 +67,75 @@ public class ScheduledAppSubscriptionTask implements Task {
}
@Override
public void init() {
if (this.subscriptionManager == null) {
this.subscriptionManager = new SubscriptionManagerImpl();
}
}
@Override
public void execute() {
try {
ScheduledSubscriptionDTO subscriptionDTO = subscriptionManager.getPendingScheduledSubscription(
this.taskName);
if (subscriptionDTO == null) {
log.error("Unable to execute the task. Task entry for [" + this.taskName + "] cannot be retrieved " +
"from the database.");
return;
}
if (StringUtils.isNotEmpty(this.subscribers)) {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext();
carbonContext.setTenantDomain(this.tenantDomain);
carbonContext.setTenantId(this.tenantId);
carbonContext.setUsername(this.subscriber);
public void executeRandomlyAssignedTask() {
if(super.isQualifiedToExecuteTask()) {
try {
ScheduledSubscriptionDTO subscriptionDTO = subscriptionManager.getPendingScheduledSubscription(
this.taskName);
if (subscriptionDTO == null) {
log.error("Unable to execute the task. Task entry for [" + this.taskName + "] cannot be retrieved " +
"from the database.");
return;
}
if (StringUtils.isNotEmpty(this.subscribers)) {
PrivilegedCarbonContext.startTenantFlow();
PrivilegedCarbonContext carbonContext = PrivilegedCarbonContext.getThreadLocalCarbonContext();
carbonContext.setTenantDomain(this.tenantDomain);
carbonContext.setTenantId(this.tenantId);
carbonContext.setUsername(this.subscriber);
if (this.subscriptionType.equals(SubscriptionType.DEVICE.toString())) {
List<DeviceIdentifier> deviceIdentifiers = new Gson().fromJson(this.subscribers,
new TypeToken<List<DeviceIdentifier>>() {
}.getType());
try {
subscriptionManager.performBulkAppOperation(this.application, deviceIdentifiers,
this.subscriptionType, this.action);
subscriptionDTO.setStatus(ExecutionStatus.EXECUTED);
} catch (ApplicationManagementException e) {
log.error(
"Error occurred while " + this.action + "ing application " + this.application
+ "to/from the following devices: " + this.subscribers, e);
subscriptionDTO.setStatus(ExecutionStatus.FAILED);
if (this.subscriptionType.equals(SubscriptionType.DEVICE.toString())) {
List<DeviceIdentifier> deviceIdentifiers = new Gson().fromJson(this.subscribers,
new TypeToken<List<DeviceIdentifier>>() {
}.getType());
try {
subscriptionManager.performBulkAppOperation(this.application, deviceIdentifiers,
this.subscriptionType, this.action);
subscriptionDTO.setStatus(ExecutionStatus.EXECUTED);
} catch (ApplicationManagementException e) {
log.error(
"Error occurred while " + this.action + "ing application " + this.application
+ "to/from the following devices: " + this.subscribers, e);
subscriptionDTO.setStatus(ExecutionStatus.FAILED);
}
} else {
List<String> subscriberList = Pattern.compile(",").splitAsStream(this.subscribers).collect(
Collectors.toList());
try {
subscriptionManager.performBulkAppOperation(this.application, subscriberList,
this.subscriptionType, this.action);
subscriptionDTO.setStatus(ExecutionStatus.EXECUTED);
} catch (ApplicationManagementException e) {
log.error(
"Error occurred while " + this.action + "ing application " + this.application
+ "to/from the following " + this.subscriptionType + "s: " + this.subscribers, e);
subscriptionDTO.setStatus(ExecutionStatus.FAILED);
}
}
} else {
List<String> subscriberList = Pattern.compile(",").splitAsStream(this.subscribers).collect(
Collectors.toList());
try {
subscriptionManager.performBulkAppOperation(this.application, subscriberList,
this.subscriptionType, this.action);
subscriptionDTO.setStatus(ExecutionStatus.EXECUTED);
} catch (ApplicationManagementException e) {
log.error(
"Error occurred while " + this.action + "ing application " + this.application
+ "to/from the following " + this.subscriptionType + "s: " + this.subscribers, e);
subscriptionDTO.setStatus(ExecutionStatus.FAILED);
}
log.warn(
"Subscriber list is empty. Therefore skipping scheduled task to " + this.action + "application "
+ this.application);
subscriptionDTO.setStatus(ExecutionStatus.FAILED);
}
} else {
log.warn(
"Subscriber list is empty. Therefore skipping scheduled task to " + this.action + "application "
+ this.application);
subscriptionDTO.setStatus(ExecutionStatus.FAILED);
subscriptionManager.updateScheduledSubscriptionStatus(subscriptionDTO.getId(), subscriptionDTO.getStatus());
} catch (SubscriptionManagementException e) {
log.error("Error occurred while executing the task: " + this.taskName, e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
subscriptionManager.updateScheduledSubscriptionStatus(subscriptionDTO.getId(), subscriptionDTO.getStatus());
} catch (SubscriptionManagementException e) {
log.error("Error occurred while executing the task: " + this.taskName, e);
} finally {
PrivilegedCarbonContext.endTenantFlow();
}
}
@Override
protected void setup() {
if (this.subscriptionManager == null) {
this.subscriptionManager = new SubscriptionManagerImpl();
}
}
@Override
public String getTaskName() {
return TASK_NAME;
}
}

@ -68,14 +68,13 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask {
}
@Override
public void execute() {
public void executeDynamicTask() {
List<OperationEnrolmentMapping> operationEnrolmentMappings;
List<EnrolmentInfo> enrolmentInfoTobeUpdated = new ArrayList<>();
Map<Integer, Long> lastActivities = null;
EnrolmentInfo enrolmentInfo;
DeviceIdentifier deviceIdentifier;
Device device;
super.refreshContext();
try {
operationEnrolmentMappings = this.getOperationEnrolmentMappings(super.getTaskContext());
if (operationEnrolmentMappings.size() > 0) {

@ -65,8 +65,7 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask {
}
@Override
public void execute() {
super.refreshContext();
public void executeDynamicTask() {
deviceManagementProviderService = DeviceManagementDataHolder.getInstance()
.getDeviceManagementProvider();
OperationMonitoringTaskConfig operationMonitoringTaskConfig = deviceManagementProviderService

@ -36,22 +36,12 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
@Override
public final void init() {
try {
ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
if(ctxInfo!=null){
boolean dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled();
if(dynamicTaskEnabled){
taskContext = new DynamicTaskContext();
updateContext(ctxInfo);
if(ctxInfo.getActiveServerCount() > 0){
taskContext.setPartitioningEnabled(true);
}
if(log.isDebugEnabled()){
log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() +
" where active server count is : " + taskContext.getActiveServerCount() +
" partitioning task enabled : " + taskContext.isPartitioningEnabled());
}
taskContext.setPartitioningEnabled(true);
} else {
log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.");
log.info("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.");
}
} catch (HeartBeatManagementException e) {
log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function." , e);
@ -59,29 +49,56 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
setup();
}
public DynamicTaskContext refreshContext(){
try {
ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
if(ctxInfo != null) {
updateContext(ctxInfo);
} else {
log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode.");
@Override
public final void execute() {
refreshContext();
executeDynamicTask();
}
public void refreshContext(){
if(taskContext != null && taskContext.isPartitioningEnabled()) {
try {
updateContext();
} catch (HeartBeatManagementException e) {
log.error("Error refreshing Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.", e);
}
} catch (HeartBeatManagementException e) {
log.error("Error refreshing Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.", e);
}
return taskContext;
}
private void updateContext(ServerCtxInfo ctxInfo) {
private void updateContext() throws HeartBeatManagementException {
ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
if(ctxInfo != null) {
populateContext(ctxInfo);
} else {
log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode.");
}
}
private void populateContext(ServerCtxInfo ctxInfo) {
taskContext.setActiveServerCount(ctxInfo.getActiveServerCount());
taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx());
if(log.isDebugEnabled()){
log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() +
" where active server count is : " + taskContext.getActiveServerCount() +
" partitioning task enabled : " + taskContext.isPartitioningEnabled());
}
}
protected abstract void setup();
protected abstract void executeDynamicTask();
public static DynamicTaskContext getTaskContext() {
return taskContext;
}
public static boolean isDynamicTaskEligible(){
if(taskContext != null && taskContext.isPartitioningEnabled()) {
return true;
} else {
return false;
}
}
}

@ -0,0 +1,76 @@
/*
* Copyright (c) 2020, Entgra (pvt) Ltd. (http://entgra.io) All Rights Reserved.
*
* Entgra (pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.core.task.impl;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder;
import org.wso2.carbon.ntask.core.Task;
public abstract class RandomlyAssignedScheduleTask implements Task {
private static final Log log = LogFactory.getLog(RandomlyAssignedScheduleTask.class);
private static String taskName = "UNSPECIFIED";
private static boolean qualifiedToExecuteTask = false;
private static boolean dynamicTaskEnabled = false;
@Override
public final void init() {
try {
dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled();
} catch (HeartBeatManagementException e) {
log.error("Error Instantiating Variables necessary for Randomly Assigned Task Scheduling." , e);
}
//This is done so that sub class extending this abstract class is forced to specify a task name.
taskName = getTaskName();
setup();
}
@Override
public final void execute() {
refreshContext();
executeRandomlyAssignedTask();
}
public void refreshContext(){
if(dynamicTaskEnabled) {
try {
qualifiedToExecuteTask = DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask();
log.info("## NODE Qualified to execute Randomly Assigned Task : " + taskName);
DeviceManagementDataHolder.getInstance().getHeartBeatService().updateTaskExecutionAcknowledgement(taskName);
} catch (HeartBeatManagementException e) {
log.error("Error refreshing Variables necessary for Randomly Assigned Scheduled Task. " +
"Dynamic Tasks will not function.", e);
}
}
}
protected abstract void setup();
protected abstract void executeRandomlyAssignedTask();
public static boolean isQualifiedToExecuteTask() {
return qualifiedToExecuteTask;
}
public abstract String getTaskName();
}

@ -7,6 +7,11 @@ import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementServi
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
public class TestHeartBeatManagementService implements HeartBeatManagementService {
@Override
public boolean isTaskPartitioningEnabled() throws HeartBeatManagementException {
return false;
}
@Override
public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException {
return null;
@ -21,4 +26,20 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic
public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException {
return false;
}
@Override
public void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException {
}
@Override
public boolean updateTaskExecutionAcknowledgement(String newTask)
throws HeartBeatManagementException {
return false;
}
@Override
public boolean isQualifiedToExecuteTask() throws HeartBeatManagementException {
return false;
}
}

@ -19,9 +19,11 @@
package io.entgra.server.bootup.heartbeat.beacon.dao;
import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOException;
import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import java.util.List;
import java.util.Map;
/**
@ -39,4 +41,12 @@ public interface HeartBeatDAO {
Map<String, ServerContext> getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException;
boolean recordElectedCandidate(String serverUUID) throws HeartBeatDAOException;
void purgeCandidates() throws HeartBeatDAOException;
ElectedCandidate retrieveCandidate() throws HeartBeatDAOException;
boolean acknowledgeTask(String uuid, List<String> taskList) throws HeartBeatDAOException;
}

@ -22,8 +22,11 @@ import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory;
import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatDAO;
import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOException;
import io.entgra.server.bootup.heartbeat.beacon.dao.util.HeartBeatBeaconDAOUtil;
import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;
@ -32,6 +35,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@ -41,6 +45,8 @@ import java.util.concurrent.TimeUnit;
*/
public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
private static final Log log = LogFactory.getLog(GenericHeartBeatDAOImpl.class);
@Override
public String recordServerCtx(ServerContext ctx) throws HeartBeatDAOException {
PreparedStatement stmt = null;
@ -69,6 +75,83 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
return uuid;
}
@Override
public boolean recordElectedCandidate(String serverUUID) throws HeartBeatDAOException {
PreparedStatement stmt = null;
try {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
String sql;
sql = "INSERT INTO ELECTED_LEADER_META_INFO(UUID) VALUES (?)";
stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
stmt.setString(1, serverUUID);
return stmt.executeUpdate() > 0;
} catch (SQLException e) {
throw new HeartBeatDAOException("Error occurred while persisting UUID of chosen " +
"elected dynamic task execution candidate : " + serverUUID , e);
} finally {
HeartBeatBeaconDAOUtil.cleanupResources(stmt, null);
}
}
@Override
public void purgeCandidates() throws HeartBeatDAOException {
Statement stmt = null;
try {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
conn.setAutoCommit(false);
String sql = "TRUNCATE TABLE ELECTED_LEADER_META_INFO";
stmt = conn.createStatement();
stmt.execute(sql);
conn.commit();
} catch (SQLException e) {
throw new HeartBeatDAOException("Error occurred while truncating ELECTED_LEADER_META_INFO table.", e);
} finally {
HeartBeatBeaconDAOUtil.cleanupResources(stmt, null);
}
}
@Override
public ElectedCandidate retrieveCandidate() throws HeartBeatDAOException {
Statement stmt = null;
ResultSet resultSet = null;
ElectedCandidate candidate = null;
try {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
String sql = "SELECT * from ELECTED_LEADER_META_INFO";
stmt = conn.createStatement();
resultSet = stmt.executeQuery(sql);
while (resultSet.next()){
candidate = HeartBeatBeaconDAOUtil.populateCandidate(resultSet);
}
} catch (SQLException e) {
throw new HeartBeatDAOException("Error occurred while retrieving meta information of elected candidate", e);
} finally {
HeartBeatBeaconDAOUtil.cleanupResources(stmt, resultSet);
}
return candidate;
}
@Override
public boolean acknowledgeTask(String uuid, List<String> taskList) throws HeartBeatDAOException {
PreparedStatement stmt = null;
try {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
String sql;
sql = "UPDATE ELECTED_LEADER_META_INFO SET ACKNOWLEDGED_TASK_LIST = ? WHERE UUID = ?";
stmt = conn.prepareStatement(sql, new String[]{"UUID"});
stmt.setString(1, String.join(",", taskList));
stmt.setString(2, uuid);
return stmt.executeUpdate() > 0;
} catch (SQLException e) {
throw new HeartBeatDAOException("Error occurred while updating task list of elected server : '" +
uuid + "' and task list " + taskList, e);
} finally {
HeartBeatBeaconDAOUtil.cleanupResources(stmt, null);
}
}
@Override
public boolean recordHeatBeat(HeartBeatEvent event) throws HeartBeatDAOException {
PreparedStatement stmt = null;
@ -152,8 +235,7 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
"WHERE LAST_UPDATED_TIMESTAMP > ? " +
"ORDER BY UUID";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, elapsedTimeInSeconds);
stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)));
stmt.setTimestamp(1, new Timestamp(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)));
resultSet = stmt.executeQuery();
while (resultSet.next()) {
ctxList.put(resultSet.getString("UUID"), HeartBeatBeaconDAOUtil.populateContext(resultSet));

@ -18,6 +18,7 @@
package io.entgra.server.bootup.heartbeat.beacon.dao.util;
import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -27,6 +28,8 @@ import javax.sql.DataSource;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Hashtable;
/**
@ -59,6 +62,29 @@ public final class HeartBeatBeaconDAOUtil {
}
}
/**
* Cleanup resources used to transaction
*
* @param stmt Statement used
* @param rs Obtained results set
*/
public static void cleanupResources(Statement stmt, ResultSet rs) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
log.warn("Error occurred while closing result set", e);
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
log.warn("Error occurred while closing prepared statement", e);
}
}
}
/**
* Lookup datasource using name and jndi properties
*
@ -88,4 +114,15 @@ public final class HeartBeatBeaconDAOUtil {
ctx.setCarbonServerPort(resultSet.getInt("SERVER_PORT"));
return ctx;
}
public static ElectedCandidate populateCandidate(ResultSet resultSet) throws SQLException {
ElectedCandidate candidate = new ElectedCandidate();
candidate.setServerUUID(resultSet.getString("UUID"));
candidate.setTimeOfElection(resultSet.getTimestamp("ELECTED_TIME"));
String tasksList = resultSet.getString("ACKNOWLEDGED_TASK_LIST");
if(tasksList != null && !tasksList.isEmpty()){
candidate.setAcknowledgedTaskList(Arrays.asList(tasksList.split(",")));
}
return candidate;
}
}

@ -0,0 +1,54 @@
/*
* Copyright (c) 2020, Entgra Pvt Ltd. (http://www.wso2.org) All Rights Reserved.
*
* Entgra Pvt Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.entgra.server.bootup.heartbeat.beacon.dto;
import java.sql.Timestamp;
import java.util.List;
public class ElectedCandidate {
private String serverUUID;
private Timestamp timeOfElection;
private List<String> acknowledgedTaskList = null;
public List<String> getAcknowledgedTaskList() {
return acknowledgedTaskList;
}
public void setAcknowledgedTaskList(List<String> acknowledgedTaskList) {
this.acknowledgedTaskList = acknowledgedTaskList;
}
public String getServerUUID() {
return serverUUID;
}
public void setServerUUID(String serverUUID) {
this.serverUUID = serverUUID;
}
public Timestamp getTimeOfElection() {
return timeOfElection;
}
public void setTimeOfElection(Timestamp timeOfElection) {
this.timeOfElection = timeOfElection;
}
}

@ -47,7 +47,7 @@ public class HeartBeatBeaconComponent {
protected void activate(ComponentContext componentContext) {
try {
if (log.isDebugEnabled()) {
log.debug("Initializing email sender core bundle");
log.debug("Initializing heart beat management bundle");
}
this.registerHeartBeatServices(componentContext);
@ -88,7 +88,7 @@ public class HeartBeatBeaconComponent {
/* This is to avoid mobile device management component getting initialized before the underlying datasources
are registered */
if (log.isDebugEnabled()) {
log.debug("Data source service set to mobile service component");
log.debug("Data source service set to heart beat management component");
}
}

@ -57,13 +57,17 @@ public class HeartBeatExecutor {
uuid = HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().updateServerContext(ctx);
HeartBeatBeaconUtils.saveUUID(uuid);
}
int timeOutIntervalInSeconds = CONFIG.getServerTimeOutIntervalInSeconds();
int timeSkew = CONFIG.getTimeSkew();
int cumilativeTimeOut = timeOutIntervalInSeconds + timeSkew;
final String designatedUUID = uuid;
HeartBeatBeaconDataHolder.getInstance().setLocalServerUUID(designatedUUID);
Runnable periodicTask = new Runnable() {
public void run() {
try {
recordHeartBeat(designatedUUID);
} catch (HeartBeatManagementException e) {
electDynamicTaskExecutionCandidate(cumilativeTimeOut);
} catch (Exception e) {
log.error("Error while executing record heart beat task. This will result in schedule operation malfunction.", e);
}
}
@ -83,4 +87,9 @@ public class HeartBeatExecutor {
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().recordHeartBeat(new HeartBeatEvent(uuid));
}
static void electDynamicTaskExecutionCandidate(int cumilativeTimeOut) throws HeartBeatManagementException {
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().electCandidate(cumilativeTimeOut);
}
}

@ -18,6 +18,7 @@
package io.entgra.server.bootup.heartbeat.beacon.service;
import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
@ -25,10 +26,18 @@ import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
public interface HeartBeatManagementService {
boolean isTaskPartitioningEnabled() throws HeartBeatManagementException;
ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException;
String updateServerContext(ServerContext ctx) throws HeartBeatManagementException;
boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException;
void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException;
boolean updateTaskExecutionAcknowledgement(String newTask) throws HeartBeatManagementException;
boolean isQualifiedToExecuteTask() throws HeartBeatManagementException;
}

@ -22,18 +22,29 @@ import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig;
import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory;
import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatDAO;
import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOException;
import io.entgra.server.bootup.heartbeat.beacon.dto.ElectedCandidate;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.server.bootup.heartbeat.beacon.internal.HeartBeatBeaconDataHolder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
import org.wso2.carbon.device.mgt.common.exceptions.TransactionManagementException;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class HeartBeatManagementServiceImpl implements HeartBeatManagementService {
private static final Log log = LogFactory.getLog(HeartBeatManagementServiceImpl.class);
private final HeartBeatDAO heartBeatDAO;
public HeartBeatManagementServiceImpl(){
@ -77,6 +88,19 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
return serverCtxInfo;
}
@Override
public boolean isTaskPartitioningEnabled() throws HeartBeatManagementException {
boolean enabled = false;
if(HeartBeatBeaconConfig.getInstance() != null){
enabled = HeartBeatBeaconConfig.getInstance().isEnabled();
} else {
String msg = "Issue instantiating heart beat config.";
throw new HeartBeatManagementException(msg);
}
return enabled;
}
@Override
public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException {
String uuid = null;
@ -105,6 +129,124 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
return uuid;
}
@Override
public boolean isQualifiedToExecuteTask() throws HeartBeatManagementException {
boolean isQualified = false;
if(HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
String localServerUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
HeartBeatBeaconDAOFactory.openConnection();
ElectedCandidate candidate = heartBeatDAO.retrieveCandidate();
if(candidate != null && candidate.getServerUUID().equalsIgnoreCase(localServerUUID)){
isQualified = true;
if(log.isDebugEnabled()){
log.debug("Node : " + localServerUUID + " Qualified to execute randomly assigned task.");
}
}
} catch (HeartBeatDAOException e) {
String msg = "Error occurred while checking if server is qualified to execute randomly designated task.";
throw new HeartBeatManagementException(msg, e);
} catch (SQLException e) {
String msg = "Error occurred while opening a connection to the underlying data source";
throw new HeartBeatManagementException(msg, e);
} finally {
HeartBeatBeaconDAOFactory.closeConnection();
}
} else {
String msg = "Heart Beat Configuration Disabled. Error occurred while checking if server is qualified to execute randomly designated task.";
throw new HeartBeatManagementException(msg);
}
return isQualified;
}
@Override
public boolean updateTaskExecutionAcknowledgement(String newTask) throws HeartBeatManagementException {
boolean result = false;
if(HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
String serverUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
HeartBeatBeaconDAOFactory.beginTransaction();
ElectedCandidate candidate = heartBeatDAO.retrieveCandidate();
if(candidate != null && candidate.getServerUUID().equals(serverUUID)){
List<String> taskList = candidate.getAcknowledgedTaskList();
boolean taskExecuted = false;
for(String task : taskList){
if(task.equalsIgnoreCase(newTask)){
taskExecuted = true;
break;
}
}
if(!taskExecuted) {
taskList.add(newTask);
result = heartBeatDAO.acknowledgeTask(serverUUID, taskList);
HeartBeatBeaconDAOFactory.commitTransaction();
}
}
} catch (HeartBeatDAOException e) {
String msg = "Error occurred while updating acknowledged task.";
throw new HeartBeatManagementException(msg, e);
} catch (TransactionManagementException e) {
HeartBeatBeaconDAOFactory.rollbackTransaction();
String msg = "Error occurred while updating acknowledged task.. Issue in opening a connection to the underlying data source";
throw new HeartBeatManagementException(msg, e);
} finally {
HeartBeatBeaconDAOFactory.closeConnection();
}
} else {
String msg = "Heart Beat Configuration Disabled. Updating acknowledged task list failed.";
throw new HeartBeatManagementException(msg);
}
return result;
}
@Override
public void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException {
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
HeartBeatBeaconDAOFactory.beginTransaction();
Map<String, ServerContext> servers = heartBeatDAO.getActiveServerDetails(elapsedTimeInSeconds);
if (servers != null && !servers.isEmpty()) {
ElectedCandidate presentCandidate = heartBeatDAO.retrieveCandidate();
if (presentCandidate != null &&
presentCandidate.getTimeOfElection().before(new Timestamp(System.currentTimeMillis()
- TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)))) {
heartBeatDAO.purgeCandidates();
electCandidate(servers);
} else {
electCandidate(servers);
}
HeartBeatBeaconDAOFactory.commitTransaction();
}
} catch (HeartBeatDAOException e) {
String msg = "Error occurred while electing candidate for dynamic task execution.";
throw new HeartBeatManagementException(msg, e);
} catch (TransactionManagementException e) {
HeartBeatBeaconDAOFactory.rollbackTransaction();
String msg = "Error occurred while electing candidate for dynamic task execution. Issue in opening a connection to the underlying data source";
throw new HeartBeatManagementException(msg, e);
} finally {
HeartBeatBeaconDAOFactory.closeConnection();
}
} else {
String msg = "Heart Beat Configuration Disabled. Error electing candidate for dynamic task execution.";
throw new HeartBeatManagementException(msg);
}
}
private void electCandidate(Map<String, ServerContext> servers) throws HeartBeatDAOException {
String electedCandidate = getRandomElement(servers.keySet());
heartBeatDAO.recordElectedCandidate(electedCandidate);
}
private String getRandomElement(Set<String> valueSet)
{
Random rand = new Random();
List<String> items = new ArrayList<>(valueSet);
return items.get(rand.nextInt(items.size()));
}
@Override
public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException {

@ -49,8 +49,7 @@ public class DelegationTask extends DynamicPartitionedScheduleTask {
}
@Override
public void execute() {
super.refreshContext();
public void executeDynamicTask() {
try {
PolicyManager policyManager = new PolicyManagerImpl();
UpdatedPolicyDeviceListBean updatedPolicyDeviceList = policyManager.applyChangesMadeToPolicies();
@ -70,7 +69,7 @@ public class DelegationTask extends DynamicPartitionedScheduleTask {
try {
devices = new ArrayList<>();
toBeNotified = new ArrayList<>();
if(super.getTaskContext() != null && super.getTaskContext().isPartitioningEnabled()) {
if(super.isDynamicTaskEligible()) {
devices.addAll(service.getAllocatedDevices(deviceType,
super.getTaskContext().getActiveServerCount(),
super.getTaskContext().getServerHashIndex()));

@ -45,12 +45,10 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask {
}
@Override
public void execute() {
public void executeDynamicTask() {
if (log.isDebugEnabled()) {
log.debug("Monitoring task started to run.");
}
this.executeforAllTenants();
}
@ -96,7 +94,6 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask {
}
private void executeTask() {
super.refreshContext();
MonitoringManager monitoringManager = PolicyManagementDataHolder.getInstance().getMonitoringManager();
List<String> deviceTypes = new ArrayList<>();
List<String> configDeviceTypes = new ArrayList<>();
@ -122,7 +119,7 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask {
PolicyManagementDataHolder.getInstance().getDeviceManagementService()
.getPolicyMonitoringManager(deviceType);
List<Device> devices;
if(super.getTaskContext()!= null && super.getTaskContext().isPartitioningEnabled()){
if(super.isDynamicTaskEligible()){
devices = deviceManagementProviderService.getAllocatedDevices(deviceType,
super.getTaskContext().getActiveServerCount(),
super.getTaskContext().getServerHashIndex());

@ -7,6 +7,11 @@ import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementServi
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
public class TestHeartBeatManagementService implements HeartBeatManagementService {
@Override
public boolean isTaskPartitioningEnabled() throws HeartBeatManagementException {
return false;
}
@Override
public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException {
return null;
@ -21,4 +26,20 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic
public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException {
return false;
}
@Override
public void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException {
}
@Override
public boolean updateTaskExecutionAcknowledgement(String newTask)
throws HeartBeatManagementException {
return false;
}
@Override
public boolean isQualifiedToExecuteTask() throws HeartBeatManagementException {
return false;
}
}

@ -19,7 +19,7 @@
-->
<HeartBeatBeaconConfig>
<Enable>true</Enable>
<Enable>false</Enable>
<DataSourceConfiguration>
<JndiLookupDefinition>
<Name>jdbc/HeartBeat_DS</Name>

@ -9,3 +9,10 @@ CREATE TABLE IF NOT EXISTS SERVER_HEART_BEAT_EVENTS (
LAST_UPDATED_TIMESTAMP TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (ID)
);
CREATE TABLE IF NOT EXISTS ELECTED_LEADER_META_INFO (
UUID VARCHAR(100) NOT NULL,
ELECTED_TIME TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
ACKNOWLEDGED_TASK_LIST TEXT DEFAULT NULL,
PRIMARY KEY (UUID)
);

@ -11,4 +11,9 @@ CREATE TABLE SERVER_HEART_BEAT_EVENTS (
LAST_UPDATED_TIMESTAMP DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (ID));
IF NOT EXISTS (SELECT * FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[ELECTED_LEADER_META_INFO]') AND TYPE IN (N'U'))
CREATE TABLE ELECTED_LEADER_META_INFO (
UUID VARCHAR(100) NOT NULL,
ELECTED_TIME DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
ACKNOWLEDGED_TASK_LIST VARCHAR(MAX) DEFAULT NULL,
PRIMARY KEY (UUID));

@ -9,3 +9,10 @@ CREATE TABLE IF NOT EXISTS SERVER_HEART_BEAT_EVENTS (
LAST_UPDATED_TIMESTAMP TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (ID)
);
CREATE TABLE IF NOT EXISTS ELECTED_LEADER_META_INFO (
UUID VARCHAR(100) NOT NULL,
ELECTED_TIME TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
ACKNOWLEDGED_TASK_LIST TEXT DEFAULT NULL,
PRIMARY KEY (UUID)
);

@ -11,3 +11,11 @@ CREATE TABLE SERVER_HEART_BEAT_EVENTS (
CONSTRAINT PK_SERVER_HEART_BEAT_EVENTS PRIMARY KEY (ID)
)
/
CREATE TABLE ELECTED_LEADER_META_INFO (
UUID VARCHAR(100) NOT NULL,
ELECTED_TIME TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
ACKNOWLEDGED_TASK_LIST BLOB DEFAULT NULL,
CONSTRAINT PK_SERVER_HEART_BEAT_EVENTS PRIMARY KEY (UUID)
)
/

@ -9,3 +9,10 @@
LAST_UPDATED_TIMESTAMP TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (ID)
);
CREATE TABLE IF NOT EXISTS ELECTED_LEADER_META_INFO (
UUID VARCHAR(100) NOT NULL,
ELECTED_TIME TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
ACKNOWLEDGED_TASK_LIST TEXT DEFAULT NULL,
PRIMARY KEY (UUID)
);

Loading…
Cancel
Save