Adding improvements to dynamic task allocation

revert-70ac1926
Ace 4 years ago
parent b1d95af5b0
commit 9d39197844

@ -831,10 +831,10 @@ public abstract class AbstractDeviceDAOImpl implements DeviceDAO {
" WHERE DEVICE_TYPE_ID = t.ID" +
" AND t.NAME = ?" +
" AND t.ID = d.DEVICE_TYPE_ID" +
" AND d.TENANT_ID = ?) d1" +
" AND d.TENANT_ID = ?) d1 " +
"WHERE d1.ID = e.DEVICE_ID" +
" AND TENANT_ID = ?" +
" AND MOD(d1.ID, ?) = ?" +
" AND MOD(d1.ID, ?) = ? " +
"ORDER BY e.DATE_OF_LAST_UPDATE DESC";
stmt = conn.prepareStatement(sql);

@ -342,6 +342,9 @@ public class OperationManagerImpl implements OperationManager {
Map<Integer, Device> enrolments = new HashMap<>();
for (Device device : devices) {
enrolments.put(device.getEnrolmentInfo().getId(), device);
if(log.isDebugEnabled()){
log.info("Adding operation for device Id : " + device.getDeviceIdentifier());
}
}
if (operationDto.getControl() ==
org.wso2.carbon.device.mgt.core.dto.operation.mgt.Operation.Control.NO_REPEAT) {

@ -55,6 +55,24 @@ public interface OperationMappingDAO {
long maxDuration, int deviceTypeId)
throws OperationManagementDAOException;
/**
* This method returns first pending/repeated operation available for each active enrolment of given device-type
* in a task partitioned execution scenario
* where the operation was created after the given timestamp.
*
* @param minDuration
* @param maxDuration
* @param deviceTypeId
* @param activeServerCount
* @param serverHashIndex
* @return
*/
List<OperationEnrolmentMapping> getFirstPendingOperationMappingsForActiveEnrolments(long minDuration,
long maxDuration, int deviceTypeId,
int activeServerCount, int serverHashIndex)
throws OperationManagementDAOException;
/**
* This method returns the timestamp of last completed Operation for each active enrolment of given device-type
* where the operation was completed after the given timestamp.
@ -67,4 +85,19 @@ public interface OperationMappingDAO {
Map<Integer, Long> getLastConnectedTimeForActiveEnrolments(long timeStamp, int deviceTypeId)
throws OperationManagementDAOException;
/**
* This method returns the timestamp of last completed Operation for each active enrolment of given device-type
* in a task partitioned execution scenario
* where the operation was completed after the given timestamp.
*
* @param timeStamp
* @param deviceTypeId
* @param activeServerCount
* @param serverHashIndex
* @return
*/
Map<Integer, Long> getLastConnectedTimeForActiveEnrolments(long timeStamp, int deviceTypeId, int activeServerCount, int serverHashIndex)
throws OperationManagementDAOException;
}

@ -227,6 +227,46 @@ public class OperationMappingDAOImpl implements OperationMappingDAO {
return enrolmentOperationMappingList;
}
@Override
public List<OperationEnrolmentMapping> getFirstPendingOperationMappingsForActiveEnrolments(long minDuration,
long maxDuration, int deviceTypeId,
int activeServerCount, int serverHashIndex) throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
List<OperationEnrolmentMapping> enrolmentOperationMappingList;
try {
Connection conn = OperationManagementDAOFactory.getConnection();
//We are specifically looking for operation mappings in 'Pending' & 'Repeated' states. Further we want
//devices to be active at that moment. Hence filtering by 'ACTIVE' & 'UNREACHABLE' device states.
String sql = "SELECT ENROLMENT_ID, D.DEVICE_IDENTIFICATION AS DEVICE_IDENTIFIER, MIN(CREATED_TIMESTAMP) " +
"AS CREATED_TIMESTAMP, E.STATUS AS ENROLMENT_STATUS, E.TENANT_ID FROM " +
"DM_ENROLMENT_OP_MAPPING OP INNER JOIN DM_ENROLMENT E ON OP.ENROLMENT_ID = E.ID INNER JOIN " +
"DM_DEVICE D ON E.DEVICE_ID = D.ID WHERE " +
"OP.STATUS IN ('"+ Operation.Status.PENDING.name() + "','" + Operation.Status.REPEATED.name() + "') " +
"AND OP.CREATED_TIMESTAMP BETWEEN ? AND ? AND E.STATUS IN ('" + EnrolmentInfo.Status.ACTIVE.name() +
"','" + EnrolmentInfo.Status.UNREACHABLE.name() + "') AND D.DEVICE_TYPE_ID = ? AND MOD(D.ID, ?) = ? GROUP BY ENROLMENT_ID," +
" D.DEVICE_IDENTIFICATION, E.STATUS, E.TENANT_ID";
stmt = conn.prepareStatement(sql);
stmt.setLong(1, maxDuration);
stmt.setLong(2, minDuration);
stmt.setInt(3, deviceTypeId);
stmt.setInt(4, activeServerCount);
stmt.setInt(5, serverHashIndex);
rs = stmt.executeQuery();
enrolmentOperationMappingList = new ArrayList<>();
while (rs.next()) {
OperationEnrolmentMapping enrolmentOperationMapping = this.getEnrolmentOpMapping(rs);
enrolmentOperationMappingList.add(enrolmentOperationMapping);
}
} catch (SQLException e) {
throw new OperationManagementDAOException("Error occurred while fetching pending operation mappings for " +
"active devices of type '" + deviceTypeId + "'", e);
} finally {
OperationManagementDAOUtil.cleanupResources(stmt, rs);
}
return enrolmentOperationMappingList;
}
@Override
public Map<Integer, Long> getLastConnectedTimeForActiveEnrolments(long timeStamp, int deviceTypeId) throws OperationManagementDAOException {
PreparedStatement stmt = null;
@ -259,6 +299,40 @@ public class OperationMappingDAOImpl implements OperationMappingDAO {
return lastConnectedTimeMap;
}
@Override
public Map<Integer, Long> getLastConnectedTimeForActiveEnrolments(long timeStamp, int deviceTypeId, int activeServerCount, int serverHashIndex) throws OperationManagementDAOException {
PreparedStatement stmt = null;
ResultSet rs = null;
Map<Integer, Long> lastConnectedTimeMap = null;
try {
Connection conn = OperationManagementDAOFactory.getConnection();
//We are specifically looking for operation mappings in 'Pending' & 'Repeated' states. Further we want
//devices to be active at that moment. Hence filtering by 'ACTIVE' & 'UNREACHABLE' device states.
String sql = "SELECT OP.ENROLMENT_ID AS EID, MAX(OP.UPDATED_TIMESTAMP) AS LAST_CONNECTED_TIME FROM " +
"DM_ENROLMENT_OP_MAPPING OP INNER JOIN DM_ENROLMENT E ON OP.ENROLMENT_ID = E.ID INNER JOIN " +
"DM_DEVICE D ON E.DEVICE_ID = D.ID WHERE " +
"OP.STATUS = '" + Operation.Status.COMPLETED.name() + "'" +
"AND OP.UPDATED_TIMESTAMP >= ? AND E.STATUS IN ('" + EnrolmentInfo.Status.ACTIVE.name() +
"','" + EnrolmentInfo.Status.UNREACHABLE.name() + "') AND D.DEVICE_TYPE_ID = ? AND MOD(D.ID, ?) = ? GROUP BY ENROLMENT_ID";
stmt = conn.prepareStatement(sql);
stmt.setLong(1, timeStamp);
stmt.setInt(2, deviceTypeId);
stmt.setInt(3, activeServerCount);
stmt.setInt(4, serverHashIndex);
rs = stmt.executeQuery();
lastConnectedTimeMap = new HashMap<>();
while (rs.next()) {
lastConnectedTimeMap.put(rs.getInt("EID"), rs.getLong("LAST_CONNECTED_TIME"));
}
} catch (SQLException e) {
throw new OperationManagementDAOException("Error occurred while fetching last connected time for " +
"active devices of type '" + deviceTypeId + "'", e);
} finally {
OperationManagementDAOUtil.cleanupResources(stmt, rs);
}
return lastConnectedTimeMap;
}
private OperationEnrolmentMapping getEnrolmentOpMapping(ResultSet rs) throws SQLException {
OperationEnrolmentMapping enrolmentOperationMapping = new OperationEnrolmentMapping();
enrolmentOperationMapping.setEnrolmentId(rs.getInt("ENROLMENT_ID"));

@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DeviceStatusTaskPluginConfig;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.EnrolmentInfo;
import org.wso2.carbon.device.mgt.common.exceptions.TransactionManagementException;
import org.wso2.carbon.device.mgt.core.cache.impl.DeviceCacheManagerImpl;
@ -33,6 +34,7 @@ import org.wso2.carbon.device.mgt.core.operation.mgt.OperationEnrolmentMapping;
import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOException;
import org.wso2.carbon.device.mgt.core.operation.mgt.dao.OperationManagementDAOFactory;
import org.wso2.carbon.device.mgt.core.status.task.DeviceStatusTaskException;
import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask;
import org.wso2.carbon.ntask.core.Task;
import java.sql.SQLException;
@ -44,7 +46,7 @@ import java.util.Map;
* This implements the Task service which monitors the device activity periodically & update the device-status if
* necessary.
*/
public class DeviceStatusMonitoringTask implements Task {
public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask {
private static final Log log = LogFactory.getLog(DeviceStatusMonitoringTask.class);
private String deviceType;
@ -61,7 +63,7 @@ public class DeviceStatusMonitoringTask implements Task {
}
@Override
public void init() {
protected void setup() {
}
@ -73,10 +75,11 @@ public class DeviceStatusMonitoringTask implements Task {
EnrolmentInfo enrolmentInfo;
DeviceIdentifier deviceIdentifier;
Device device;
super.refreshContext();
try {
operationEnrolmentMappings = this.getOperationEnrolmentMappings();
operationEnrolmentMappings = this.getOperationEnrolmentMappings(super.getTaskContext());
if (operationEnrolmentMappings.size() > 0) {
lastActivities = this.getLastDeviceActivities();
lastActivities = this.getLastDeviceActivities(super.getTaskContext());
}
} catch (DeviceStatusTaskException e) {
log.error("Error occurred while fetching OperationEnrolment mappings of deviceType '" + deviceType + "'", e);
@ -104,6 +107,9 @@ public class DeviceStatusMonitoringTask implements Task {
DeviceCacheManagerImpl.getInstance().addDeviceToCache(deviceIdentifier, device, mapping.getTenantId());
}
enrolmentInfoTobeUpdated.add(enrolmentInfo);
if(log.isDebugEnabled()){
log.debug("Enrollment Information updated for device ID : " + device.getDeviceIdentifier());
}
}
}
@ -163,13 +169,21 @@ public class DeviceStatusMonitoringTask implements Task {
return updateStatus;
}
private List<OperationEnrolmentMapping> getOperationEnrolmentMappings() throws DeviceStatusTaskException {
private List<OperationEnrolmentMapping> getOperationEnrolmentMappings(DynamicTaskContext ctx) throws DeviceStatusTaskException {
List<OperationEnrolmentMapping> operationEnrolmentMappings;
try {
OperationManagementDAOFactory.openConnection();
if(ctx != null && ctx.isPartitioningEnabled()){
operationEnrolmentMappings = OperationManagementDAOFactory.
getOperationMappingDAO().getFirstPendingOperationMappingsForActiveEnrolments(this.getMinTimeWindow(),
this.getMaxTimeWindow(), this.deviceTypeId,
ctx.getActiveServerCount(), ctx.getServerHashIndex());
} else {
operationEnrolmentMappings = OperationManagementDAOFactory.
getOperationMappingDAO().getFirstPendingOperationMappingsForActiveEnrolments(this.getMinTimeWindow(),
this.getMaxTimeWindow(), this.deviceTypeId);
}
} catch (SQLException e) {
throw new DeviceStatusTaskException("Error occurred while getting Enrolment operation mappings for " +
"determining device status of deviceType '" + deviceType + "'", e);
@ -182,13 +196,20 @@ public class DeviceStatusMonitoringTask implements Task {
return operationEnrolmentMappings;
}
private Map<Integer, Long> getLastDeviceActivities() throws DeviceStatusTaskException {
private Map<Integer, Long> getLastDeviceActivities(DynamicTaskContext ctx) throws DeviceStatusTaskException {
Map<Integer, Long> lastActivities;
try {
OperationManagementDAOFactory.openConnection();
if(ctx != null && ctx.isPartitioningEnabled()) {
lastActivities = OperationManagementDAOFactory.
getOperationMappingDAO().getLastConnectedTimeForActiveEnrolments(this.getMaxTimeWindow(),
this.deviceTypeId,
ctx.getActiveServerCount(), ctx.getServerHashIndex());
} else {
lastActivities = OperationManagementDAOFactory.
getOperationMappingDAO().getLastConnectedTimeForActiveEnrolments(this.getMaxTimeWindow(),
this.deviceTypeId);
}
} catch (SQLException e) {
throw new DeviceStatusTaskException("Error occurred while getting last activities for " +
"determining device status of deviceType '" + deviceType + "'", e);

@ -66,6 +66,7 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask {
@Override
public void execute() {
super.refreshContext();
deviceManagementProviderService = DeviceManagementDataHolder.getInstance()
.getDeviceManagementProvider();
OperationMonitoringTaskConfig operationMonitoringTaskConfig = deviceManagementProviderService

@ -39,8 +39,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
if(ctxInfo!=null){
taskContext = new DynamicTaskContext();
taskContext.setActiveServerCount(ctxInfo.getActiveServerCount());
taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx());
updateContext(ctxInfo);
if(ctxInfo.getActiveServerCount() > 0){
taskContext.setPartitioningEnabled(true);
@ -60,6 +59,25 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
setup();
}
public DynamicTaskContext refreshContext(){
try {
ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
if(ctxInfo != null) {
updateContext(ctxInfo);
} else {
log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode.");
}
} catch (HeartBeatManagementException e) {
log.error("Error refreshing Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.", e);
}
return taskContext;
}
private void updateContext(ServerCtxInfo ctxInfo) {
taskContext.setActiveServerCount(ctxInfo.getActiveServerCount());
taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx());
}
protected abstract void setup();
public static DynamicTaskContext getTaskContext() {

@ -0,0 +1,24 @@
package org.wso2.carbon.device.mgt.core;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
public class TestHeartBeatManagementService implements HeartBeatManagementService {
@Override
public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException {
return null;
}
@Override
public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException {
return null;
}
@Override
public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException {
return false;
}
}

@ -17,6 +17,7 @@
*/
package org.wso2.carbon.device.mgt.core.task;
import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.powermock.api.mockito.PowerMockito;
@ -32,6 +33,7 @@ import org.wso2.carbon.device.mgt.common.operation.mgt.OperationManagementExcept
import org.wso2.carbon.device.mgt.common.operation.mgt.OperationManager;
import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService;
import org.wso2.carbon.device.mgt.core.TestDeviceManagementService;
import org.wso2.carbon.device.mgt.core.TestHeartBeatManagementService;
import org.wso2.carbon.device.mgt.core.TestUtils;
import org.wso2.carbon.device.mgt.core.authorization.DeviceAccessAuthorizationServiceImpl;
import org.wso2.carbon.device.mgt.core.common.BaseDeviceManagementTest;
@ -83,6 +85,9 @@ public class DeviceTaskManagerTest extends BaseDeviceManagementTest {
DeviceManagementDataHolder.getInstance().setDeviceTaskManagerService(null);
DeviceManagementService deviceManagementService = new TestDeviceManagementService(
TestDataHolder.TEST_DEVICE_TYPE, MultitenantConstants.SUPER_TENANT_DOMAIN_NAME);
HeartBeatManagementService heartBeatManagementService = new TestHeartBeatManagementService();
DeviceManagementDataHolder.getInstance()
.setHeartBeatService(heartBeatManagementService);
this.operationManager = PowerMockito.spy(
new OperationManagerImpl(TestDataHolder.TEST_DEVICE_TYPE, deviceManagementService));
try {

@ -18,6 +18,7 @@
package io.entgra.server.bootup.heartbeat.beacon;
import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -29,10 +30,16 @@ import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Hashtable;
import java.util.Properties;
public class HeartBeatBeaconUtils {
@ -83,6 +90,33 @@ public class HeartBeatBeaconUtils {
return ctx;
}
public static void saveUUID(String text) throws IOException {
File serverDetails = new File(HeartBeatBeaconConfig.getInstance().getServerUUIDFileLocation());
serverDetails.createNewFile(); // if file already exists will do nothing
FileOutputStream fos = new FileOutputStream(serverDetails, false);
Properties prop = new Properties();
prop.setProperty("server.uuid", text);
prop.store(fos, null);
fos.close();
}
public static String readUUID() {
InputStream input = null;
String uuid = null;
try {
input = new FileInputStream(HeartBeatBeaconConfig.getInstance().getServerUUIDFileLocation());
Properties props = new Properties();
props.load(input);
uuid = props.getProperty("server.uuid");
input.close();
} catch (FileNotFoundException e) {
log.info("File : server-credentials.properties does not exist, new UUID will be generated for server.");
} catch (IOException e) {
log.error("Error accessing server-credentials.properties to locate server.uuid.", e);
}
return uuid;
}
}

@ -47,6 +47,9 @@ public class HeartBeatBeaconConfig {
private static final String HEART_BEAT_NOTIFIER_CONFIG_PATH =
CarbonUtils.getCarbonConfigDirPath() + File.separator + "heart-beat-config.xml";
private static final String SERVER_UUID_FILE_LOCATION =
CarbonUtils.getCarbonConfigDirPath() + File.separator + "server-credentials.properties";
private HeartBeatBeaconConfig() {
}
@ -112,6 +115,10 @@ public class HeartBeatBeaconConfig {
this.enabled = enabled;
}
public String getServerUUIDFileLocation(){
return SERVER_UUID_FILE_LOCATION;
}
public static void init() throws HeartBeatBeaconConfigurationException {
try {
File emailSenderConfig = new File(HEART_BEAT_NOTIFIER_CONFIG_PATH);

@ -22,7 +22,6 @@ import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOExcept
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import java.util.List;
import java.util.Map;
/**
@ -34,6 +33,8 @@ public interface HeartBeatDAO {
boolean recordHeatBeat(HeartBeatEvent event) throws HeartBeatDAOException;
boolean checkUUIDValidity(String uuid) throws HeartBeatDAOException;
String retrieveExistingServerCtx(ServerContext ctx) throws HeartBeatDAOException;
Map<String, ServerContext> getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException;

@ -30,11 +30,11 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* This class represents implementation of HeartBeatDAO
@ -89,6 +89,30 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
}
}
@Override
public boolean checkUUIDValidity(String uuid) throws HeartBeatDAOException {
PreparedStatement stmt = null;
ResultSet resultSet = null;
boolean result = false;
try {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
String sql = "SELECT ID FROM SERVER_HEART_BEAT_EVENTS WHERE UUID = ?";
stmt = conn.prepareStatement(sql);
stmt.setString(1, uuid);
resultSet = stmt.executeQuery();
if(resultSet.next()){
result = true;
}
} catch (SQLException e) {
throw new HeartBeatDAOException("Error occurred checking existense of UUID" + uuid +
" amongst heartbeat meta info ", e);
} finally {
HeartBeatBeaconDAOUtil.cleanupResources(stmt, resultSet);
}
return result;
}
@Override
public String retrieveExistingServerCtx(ServerContext ctx) throws HeartBeatDAOException {
PreparedStatement stmt = null;
@ -116,7 +140,8 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
}
@Override
public Map<String, ServerContext> getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException {
public Map<String, ServerContext> getActiveServerDetails(int elapsedTimeInSeconds)
throws HeartBeatDAOException {
PreparedStatement stmt = null;
ResultSet resultSet = null;
Map<String, ServerContext> ctxList = new HashMap<>();
@ -124,10 +149,11 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
String sql = "SELECT (@row_number:=@row_number + 1) AS IDX, UUID, HOST_NAME, SERVER_PORT from " +
"SERVER_HEART_BEAT_EVENTS, (SELECT @row_number:=-1) AS TEMP " +
"WHERE LAST_UPDATED_TIMESTAMP > DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? SECOND) " +
"WHERE LAST_UPDATED_TIMESTAMP > ? " +
"ORDER BY UUID";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, elapsedTimeInSeconds);
stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)));
resultSet = stmt.executeQuery();
while (resultSet.next()) {
ctxList.put(resultSet.getString("UUID"), HeartBeatBeaconDAOUtil.populateContext(resultSet));

@ -18,8 +18,8 @@
package io.entgra.server.bootup.heartbeat.beacon.internal;
import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig;
import io.entgra.server.bootup.heartbeat.beacon.HeartBeatBeaconUtils;
import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig;
import io.entgra.server.bootup.heartbeat.beacon.config.datasource.DataSourceConfig;
import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory;
import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
@ -59,7 +59,7 @@ public class HeartBeatBeaconComponent {
HeartBeatBeaconDAOFactory.init(dsConfig);
//Setting up executors to notify heart beat status */
HeartBeatInternalUtils.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails());
HeartBeatExecutor.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails());
}
if (log.isDebugEnabled()) {

@ -18,22 +18,23 @@
package io.entgra.server.bootup.heartbeat.beacon.internal;
import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig;
import io.entgra.server.bootup.heartbeat.beacon.HeartBeatBeaconConfigurationException;
import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory;
import io.entgra.server.bootup.heartbeat.beacon.HeartBeatBeaconUtils;
import io.entgra.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class HeartBeatInternalUtils {
public class HeartBeatExecutor {
private static Log log = LogFactory.getLog(HeartBeatInternalUtils.class);
private static Log log = LogFactory.getLog(HeartBeatExecutor.class);
private static final int DEFAULT__NOTIFIER_INTERVAL = 5;
private static final int DEFAULT_NOTIFIER_DELAY = 5;
private static HeartBeatBeaconConfig CONFIG;
@ -51,12 +52,17 @@ public class HeartBeatInternalUtils {
}
try {
String uuid = HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().updateServerContext(ctx);
HeartBeatBeaconDataHolder.getInstance().setLocalServerUUID(uuid);
String uuid = HeartBeatBeaconUtils.readUUID();
if(uuid == null){
uuid = HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().updateServerContext(ctx);
HeartBeatBeaconUtils.saveUUID(uuid);
}
final String designatedUUID = uuid;
HeartBeatBeaconDataHolder.getInstance().setLocalServerUUID(designatedUUID);
Runnable periodicTask = new Runnable() {
public void run() {
try {
recordHeartBeat(uuid);
recordHeartBeat(designatedUUID);
} catch (HeartBeatManagementException e) {
log.error("Error while executing record heart beat task. This will result in schedule operation malfunction.", e);
}
@ -68,6 +74,8 @@ public class HeartBeatInternalUtils {
TimeUnit.SECONDS);
} catch (HeartBeatManagementException e) {
throw new HeartBeatBeaconConfigurationException("Error occured while updating initial server context.", e);
} catch (IOException e) {
throw new HeartBeatBeaconConfigurationException("Error while persisting UUID of server.", e);
}
}

@ -19,8 +19,8 @@
package io.entgra.server.bootup.heartbeat.beacon.service;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
public interface HeartBeatManagementService {

@ -23,8 +23,8 @@ import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory;
import io.entgra.server.bootup.heartbeat.beacon.dao.HeartBeatDAO;
import io.entgra.server.bootup.heartbeat.beacon.dao.exception.HeartBeatDAOException;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.server.bootup.heartbeat.beacon.internal.HeartBeatBeaconDataHolder;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
import org.wso2.carbon.device.mgt.common.exceptions.TransactionManagementException;
@ -34,17 +34,21 @@ import java.util.Map;
public class HeartBeatManagementServiceImpl implements HeartBeatManagementService {
private final HeartBeatDAO heartBeatDAO;
public HeartBeatManagementServiceImpl(){
this.heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
}
@Override
public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException {
HeartBeatDAO heartBeatDAO;
int hashIndex = -1;
ServerContext localServerCtx = null;
ServerCtxInfo serverCtxInfo = null;
if(HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
HeartBeatBeaconDAOFactory.openConnection();
heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
int timeOutIntervalInSeconds = HeartBeatBeaconConfig.getInstance().getServerTimeOutIntervalInSeconds();
int timeSkew = HeartBeatBeaconConfig.getInstance().getTimeSkew();
int cumilativeTimeOut = timeOutIntervalInSeconds + timeSkew;
@ -75,13 +79,10 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
@Override
public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException {
HeartBeatDAO heartBeatDAO;
String uuid = null;
if(HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
HeartBeatBeaconDAOFactory.beginTransaction();
heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
uuid = heartBeatDAO.retrieveExistingServerCtx(ctx);
if (uuid == null) {
uuid = heartBeatDAO.recordServerCtx(ctx);
@ -107,14 +108,17 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
@Override
public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException {
HeartBeatDAO heartBeatDAO;
boolean operationSuccess = false;
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
HeartBeatBeaconDAOFactory.beginTransaction();
heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
if(heartBeatDAO.checkUUIDValidity(event.getServerUUID())){
operationSuccess = heartBeatDAO.recordHeatBeat(event);
HeartBeatBeaconDAOFactory.commitTransaction();
} else {
String msg = "Server UUID Does not exist, heartbeat not recorded.";
throw new HeartBeatManagementException(msg);
}
} catch (HeartBeatDAOException e) {
String msg = "Error occurred while recording heart beat.";
throw new HeartBeatManagementException(msg, e);

@ -18,6 +18,7 @@
package org.wso2.carbon.policy.mgt.common;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.policy.mgt.Policy;
import org.wso2.carbon.device.mgt.common.policy.mgt.Profile;

@ -19,6 +19,7 @@
package org.wso2.carbon.policy.mgt.core.cache;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.policy.mgt.Policy;
import org.wso2.carbon.policy.mgt.common.PolicyManagementException;

@ -21,6 +21,7 @@ package org.wso2.carbon.policy.mgt.core.cache.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.policy.mgt.Policy;
import org.wso2.carbon.policy.mgt.common.PolicyManagementException;
import org.wso2.carbon.policy.mgt.core.cache.PolicyCacheManager;

@ -21,6 +21,7 @@ package org.wso2.carbon.policy.mgt.core.dao.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.policy.mgt.Profile;
import org.wso2.carbon.policy.mgt.core.dao.PolicyManagementDAOFactory;
import org.wso2.carbon.policy.mgt.core.dao.ProfileDAO;

@ -26,7 +26,7 @@ import org.wso2.carbon.device.mgt.common.EnrolmentInfo;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
import org.wso2.carbon.device.mgt.core.config.policy.PolicyConfiguration;
import org.wso2.carbon.device.mgt.core.service.DeviceManagementProviderService;
import org.wso2.carbon.ntask.core.Task;
import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask;
import org.wso2.carbon.policy.mgt.common.PolicyManagementException;
import org.wso2.carbon.policy.mgt.core.cache.impl.PolicyCacheManagerImpl;
import org.wso2.carbon.policy.mgt.core.internal.PolicyManagementDataHolder;
@ -38,7 +38,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class DelegationTask implements Task {
public class DelegationTask extends DynamicPartitionedScheduleTask {
private static final Log log = LogFactory.getLog(DelegationTask.class);
private PolicyConfiguration policyConfiguration = DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getPolicyConfiguration();
@ -48,14 +48,9 @@ public class DelegationTask implements Task {
}
@Override
public void init() {
}
@Override
public void execute() {
super.refreshContext();
try {
PolicyManager policyManager = new PolicyManagerImpl();
UpdatedPolicyDeviceListBean updatedPolicyDeviceList = policyManager.applyChangesMadeToPolicies();
@ -75,7 +70,13 @@ public class DelegationTask implements Task {
try {
devices = new ArrayList<>();
toBeNotified = new ArrayList<>();
if(super.getTaskContext() != null && super.getTaskContext().isPartitioningEnabled()) {
devices.addAll(service.getAllocatedDevices(deviceType,
super.getTaskContext().getActiveServerCount(),
super.getTaskContext().getServerHashIndex()));
} else {
devices.addAll(service.getAllDevices(deviceType, false));
}
//HashMap<Integer, Integer> deviceIdPolicy = policyManager.getAppliedPolicyIdsDeviceIds();
for (Device device : devices) {
// if (deviceIdPolicy.containsKey(device.getId())) {
@ -84,6 +85,9 @@ public class DelegationTask implements Task {
toBeNotified.add(device);
}
// }
if(log.isDebugEnabled()){
log.debug("Adding policy operation to device : " + device.getDeviceIdentifier());
}
}
if (!toBeNotified.isEmpty()) {
PolicyEnforcementDelegator enforcementDelegator = new PolicyEnforcementDelegatorImpl
@ -102,4 +106,9 @@ public class DelegationTask implements Task {
log.error("Error occurred while getting the policies applied to devices.", e);
}
}
@Override
protected void setup() {
}
}

@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.policy.mgt.Policy;
import org.wso2.carbon.device.mgt.common.policy.mgt.Profile;
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;

@ -19,6 +19,7 @@ package org.wso2.carbon.policy.mgt.core.mgt;
import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.policy.mgt.Policy;
import org.wso2.carbon.policy.mgt.common.PolicyManagementException;
import org.wso2.carbon.policy.mgt.core.mgt.bean.UpdatedPolicyDeviceListBean;

@ -39,6 +39,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.exceptions.DeviceManagementException;
import org.wso2.carbon.device.mgt.common.exceptions.InvalidDeviceException;
import org.wso2.carbon.device.mgt.common.group.mgt.DeviceGroup;

@ -20,6 +20,7 @@ package org.wso2.carbon.policy.mgt.core.mgt.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
import org.wso2.carbon.device.mgt.common.policy.mgt.Profile;
import org.wso2.carbon.device.mgt.common.policy.mgt.ProfileFeature;
import org.wso2.carbon.device.mgt.core.dao.DeviceManagementDAOFactory;

@ -96,7 +96,7 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask {
}
private void executeTask() {
super.refreshContext();
MonitoringManager monitoringManager = PolicyManagementDataHolder.getInstance().getMonitoringManager();
List<String> deviceTypes = new ArrayList<>();
List<String> configDeviceTypes = new ArrayList<>();
@ -142,6 +142,9 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask {
status.equals(EnrolmentInfo.Status.UNREACHABLE)) {
notifiableDevices.add(device);
}
if(log.isDebugEnabled()){
log.debug("Adding monitoring operation to device : " + device.getDeviceIdentifier());
}
}
if (log.isDebugEnabled()) {
log.debug("Following '" + deviceType + "' devices selected to send the notification " +

@ -17,6 +17,7 @@
*/
package org.wso2.carbon.policy.mgt.core;
import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.testng.Assert;
@ -51,6 +52,7 @@ import org.wso2.carbon.policy.mgt.core.enforcement.DelegationTask;
import org.wso2.carbon.policy.mgt.core.internal.PolicyManagementDataHolder;
import org.wso2.carbon.policy.mgt.core.mgt.MonitoringManager;
import org.wso2.carbon.policy.mgt.core.mgt.impl.MonitoringManagerImpl;
import org.wso2.carbon.policy.mgt.core.mock.TestHeartBeatManagementService;
import org.wso2.carbon.policy.mgt.core.mock.TypeXDeviceManagementService;
import org.wso2.carbon.policy.mgt.core.task.MonitoringTask;
import org.wso2.carbon.policy.mgt.core.task.TaskScheduleService;
@ -93,6 +95,8 @@ public class PolicyManagerServiceImplTest extends BasePolicyManagementDAOTest {
DeviceManagementService deviceManagementService = new TypeXDeviceManagementService(DEVICE_TYPE_A);
deviceMgtService.registerDeviceType(deviceManagementService);
operationManager = new OperationManagerImpl(DEVICE_TYPE_A, deviceManagementService);
HeartBeatManagementService heartBeatManagementService = new TestHeartBeatManagementService();
DeviceManagementDataHolder.getInstance().setHeartBeatService(heartBeatManagementService);
enrollDevice(DEVICE1, DEVICE_TYPE_A);
createDeviceGroup(GROUP1);
DeviceGroup group1 = groupMgtService.getGroup(GROUP1, false);

@ -0,0 +1,24 @@
package org.wso2.carbon.policy.mgt.core.mock;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
public class TestHeartBeatManagementService implements HeartBeatManagementService {
@Override
public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException {
return null;
}
@Override
public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException {
return null;
}
@Override
public boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException {
return false;
}
}
Loading…
Cancel
Save