Adding interface for dynamic task allocation

merge-requests/713/head
Ace 4 years ago
parent 7b28665135
commit 37ef7b9047

@ -0,0 +1,45 @@
/*
* Copyright (c) 2020, Entgra (pvt) Ltd. (http://entgra.io) All Rights Reserved.
*
* Entgra (pvt) Ltd. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.device.mgt.common;
public class ServerCtxInfo {
private int activeServerCount;
private int localServerHashIdx;
public ServerCtxInfo(int activeServerCount, int localServerHashIdx){
this.activeServerCount = activeServerCount;
this.localServerHashIdx = localServerHashIdx;
}
public int getActiveServerCount() {
return activeServerCount;
}
public void setActiveServerCount(int activeServerCount) {
this.activeServerCount = activeServerCount;
}
public int getLocalServerHashIdx() {
return localServerHashIdx;
}
public void setLocalServerHashIdx(int localServerHashIdx) {
this.localServerHashIdx = localServerHashIdx;
}
}

@ -1,10 +0,0 @@
package org.wso2.carbon.device.mgt.common.dynamic.task.allocation;
import org.wso2.carbon.device.mgt.common.Device;
import org.wso2.carbon.ntask.core.Task;
import java.util.List;
public abstract class DynamicPartitionedScheduleTask implements Task {
}

@ -114,7 +114,8 @@
org.scannotation.*,
org.wso2.carbon.event.processor.stub,
org.wso2.carbon.identity.jwt.client.extension.service,
org.apache.commons.codec.binary
org.apache.commons.codec.binary,
io.entgra.server.bootup.heartbeat.beacon
</Import-Package>
<Export-Package>
!org.wso2.carbon.device.mgt.core.internal,
@ -364,6 +365,10 @@
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>io.entgra.server.bootup.heartbeat.beacon</artifactId>
</dependency>
</dependencies>
</project>

@ -834,7 +834,7 @@ public abstract class AbstractDeviceDAOImpl implements DeviceDAO {
" AND d.TENANT_ID = ?) d1" +
"WHERE d1.ID = e.DEVICE_ID" +
" AND TENANT_ID = ?" +
" AND MOD(d1.ID, 3) = 2" +
" AND MOD(d1.ID, ?) = ?" +
"ORDER BY e.DATE_OF_LAST_UPDATE DESC";
stmt = conn.prepareStatement(sql);

@ -18,6 +18,7 @@
package org.wso2.carbon.device.mgt.core.internal;
import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import org.wso2.carbon.device.mgt.common.DeviceStatusTaskPluginConfig;
import org.wso2.carbon.device.mgt.common.OperationMonitoringTaskConfig;
import org.wso2.carbon.device.mgt.common.app.mgt.ApplicationManager;
@ -56,6 +57,7 @@ public class DeviceManagementDataHolder {
private DeviceInformationManager deviceInformationManager;
private LicenseManager licenseManager;
private RegistryService registryService;
private HeartBeatManagementService heartBeatService;
private LicenseConfig licenseConfig;
private ApplicationManager appManager;
private AppManagementConfig appManagerConfig;
@ -286,4 +288,13 @@ public class DeviceManagementDataHolder {
public void setDeviceInformationManager(DeviceInformationManager deviceInformationManager) {
this.deviceInformationManager = deviceInformationManager;
}
public HeartBeatManagementService getHeartBeatService() {
return heartBeatService;
}
public void setHeartBeatService(
HeartBeatManagementService heartBeatService) {
this.heartBeatService = heartBeatService;
}
}

@ -17,6 +17,7 @@
*/
package org.wso2.carbon.device.mgt.core.internal;
import io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.framework.BundleContext;
@ -134,6 +135,12 @@ import java.util.concurrent.TimeUnit;
* policy="dynamic"
* bind="setDeviceTypeGeneratorService"
* unbind="unsetDeviceTypeGeneratorService"
* @scr.reference name="entgra.heart.beat.service"
* interface="io.entgra.server.bootup.heartbeat.beacon.service.HeartBeatManagementService"
* cardinality="0..1"
* policy="dynamic"
* bind="setHeartBeatService"
* unbind="unsetHeartBeatService"
*/
public class DeviceManagementServiceComponent {
@ -479,6 +486,28 @@ public class DeviceManagementServiceComponent {
DeviceManagementDataHolder.getInstance().setRegistryService(null);
}
/**
* Sets HeartBeatManagementService Service.
*
* @param heartBeatService An instance of HeartBeatManagementService
*/
protected void setHeartBeatService(HeartBeatManagementService heartBeatService) {
if (log.isDebugEnabled()) {
log.debug("Setting Heart Beat Service");
}
DeviceManagementDataHolder.getInstance().setHeartBeatService(heartBeatService);
}
/**
* Unsets Registry Service.
*/
protected void unsetHeartBeatService(HeartBeatManagementService heartBeatService) {
if (log.isDebugEnabled()) {
log.debug("Un setting Heart Beat Service");
}
DeviceManagementDataHolder.getInstance().setHeartBeatService(null);
}
protected void setDataSourceService(DataSourceService dataSourceService) {
/* This is to avoid mobile device management component getting initialized before the underlying datasources
are registered */

@ -0,0 +1,43 @@
package org.wso2.carbon.device.mgt.core.task.impl;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder;
import org.wso2.carbon.ntask.core.Task;
public abstract class DynamicPartitionedScheduleTask implements Task {
private static final Log log = LogFactory.getLog(DynamicPartitionedScheduleTask.class);
private static int serverHashIndex;
private static int activeServerCount;
@Override
public final void init() {
try {
ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
if(ctxInfo!=null){
activeServerCount = ctxInfo.getActiveServerCount();
serverHashIndex = ctxInfo.getLocalServerHashIdx();
setup();
} else {
log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.");
}
} catch (HeartBeatManagementException e) {
log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function." , e);
}
}
protected abstract void setup();
public int getLocalServerHash(){
return serverHashIndex;
}
public int getActiveServerCount(){
return activeServerCount;
}
}

@ -83,10 +83,12 @@ public class HeartBeatBeaconUtils {
hexadecimal[i] = String.format("%02X", hardwareAddress[i]);
}
String macAddress = String.join("-", hexadecimal);
int iotsCorePort = Integer.parseInt(System.getProperty("iot.core.https.port"));
ServerContext ctx = new ServerContext();
ctx.setHostName(localHost.getHostName());
ctx.setMacAddress(macAddress);
ctx.setCarbonServerPort(iotsCorePort);
return ctx;
}

@ -23,6 +23,7 @@ import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import java.util.List;
import java.util.Map;
/**
* This interface represents the key operations associated with persisting group related information.
@ -35,8 +36,6 @@ public interface HeartBeatDAO {
String retrieveExistingServerCtx(ServerContext ctx) throws HeartBeatDAOException;
int getActiveServerCount(int elapsedTimeInSeconds) throws HeartBeatDAOException;
List<ServerContext> getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException;
Map<String, ServerContext> getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException;
}

@ -30,7 +30,9 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
@ -46,11 +48,12 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
String sql;
sql = "INSERT INTO SERVER_HEART_BEAT_EVENTS(HOST_NAME, MAC, UUID) VALUES (?, ?, ?)";
sql = "INSERT INTO SERVER_HEART_BEAT_EVENTS(HOST_NAME, MAC, UUID, SERVER_PORT) VALUES (?, ?, ?, ?)";
stmt = conn.prepareStatement(sql, new String[]{"UUID"});
stmt.setString(1, ctx.getHostName());
stmt.setString(2, ctx.getMacAddress());
stmt.setString(3, UUID.randomUUID().toString());
stmt.setInt(4, ctx.getCarbonServerPort());
stmt.executeUpdate();
ResultSet result = stmt.getGeneratedKeys();
@ -94,10 +97,11 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
String uuid = null;
try {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
String sql = "SELECT UUID FROM SERVER_HEART_BEAT_EVENTS WHERE HOST_NAME = ? AND MAC = ?";
String sql = "SELECT UUID FROM SERVER_HEART_BEAT_EVENTS WHERE HOST_NAME = ? AND MAC = ? AND SERVER_PORT = ?";
stmt = conn.prepareStatement(sql, new String[]{"UUID"});
stmt.setString(1, ctx.getHostName());
stmt.setString(2, ctx.getMacAddress());
stmt.setInt(3, ctx.getCarbonServerPort());
resultSet = stmt.executeQuery();
if (resultSet.next()){
@ -114,37 +118,13 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
}
@Override
public int getActiveServerCount(int elapsedTimeInSeconds) throws HeartBeatDAOException {
public Map<String, ServerContext> getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException {
PreparedStatement stmt = null;
ResultSet resultSet = null;
int count = -1;
Map<String, ServerContext> ctxList = new HashMap<>();
try {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
String sql = "SELECT COUNT(ID) AS COUNT from SERVER_HEART_BEAT_EVENTS WHERE " +
"LAST_UPDATED_TIMESTAMP > DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? SECOND)";
stmt = conn.prepareStatement(sql);
stmt.setInt(1, elapsedTimeInSeconds);
resultSet = stmt.executeQuery();
if (resultSet.next()) {
count = resultSet.getInt("COUNT");
}
} catch (SQLException e) {
throw new HeartBeatDAOException("Error occurred while retrieving acting server count with " +
"heartbeat updates within " + elapsedTimeInSeconds + " seconds.", e);
} finally {
HeartBeatBeaconDAOUtil.cleanupResources(stmt, resultSet);
}
return count;
}
@Override
public List<ServerContext> getActiveServerDetails(int elapsedTimeInSeconds) throws HeartBeatDAOException {
PreparedStatement stmt = null;
ResultSet resultSet = null;
List<ServerContext> ctxList = new ArrayList<>();
try {
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
String sql = "SELECT (@row_number:=@row_number + 1) AS IDX, UUID, HOST_NAME, MAC from " +
String sql = "SELECT (@row_number:=@row_number + 1) AS IDX, UUID, HOST_NAME, MAC, SERVER_PORT from " +
"SERVER_HEART_BEAT_EVENTS, (SELECT @row_number:=-1) AS TEMP " +
"WHERE LAST_UPDATED_TIMESTAMP > DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? SECOND) " +
"ORDER BY UUID";
@ -152,7 +132,7 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
stmt.setInt(1, elapsedTimeInSeconds);
resultSet = stmt.executeQuery();
while (resultSet.next()) {
ctxList.add(HeartBeatBeaconDAOUtil.populateContext(resultSet));
ctxList.put(resultSet.getString("UUID"), HeartBeatBeaconDAOUtil.populateContext(resultSet));
}
} catch (SQLException e) {
throw new HeartBeatDAOException("Error occurred while retrieving acting server count with " +

@ -86,6 +86,7 @@ public final class HeartBeatBeaconDAOUtil {
ctx.setUuid(resultSet.getString("UUID"));
ctx.setHostName(resultSet.getString("HOST_NAME"));
ctx.setMacAddress(resultSet.getString("MAC"));
ctx.setCarbonServerPort(resultSet.getInt("SERVER_PORT"));
return ctx;
}
}

@ -22,7 +22,7 @@ public class ServerContext {
private String hostName;
private String macAddress;
private String serverHash;
private int carbonServerPort;
private String uuid;
private int index;
@ -42,14 +42,6 @@ public class ServerContext {
this.macAddress = macAddress;
}
public String getServerHash() {
return serverHash;
}
public void setServerHash(String serverHash) {
this.serverHash = serverHash;
}
public String getUuid() {
return uuid;
}
@ -65,4 +57,12 @@ public class ServerContext {
public void setIndex(int index) {
this.index = index;
}
public int getCarbonServerPort() {
return carbonServerPort;
}
public void setCarbonServerPort(int carbonServerPort) {
this.carbonServerPort = carbonServerPort;
}
}

@ -20,12 +20,11 @@ package io.entgra.server.bootup.heartbeat.beacon.service;
import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
public interface HeartBeatManagementService {
int getActiveServerCount() throws HeartBeatManagementException;
int getServerLocalHashIndex() throws HeartBeatManagementException;
ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException;
String updateServerContext(ServerContext ctx) throws HeartBeatManagementException;

@ -26,65 +26,43 @@ import io.entgra.server.bootup.heartbeat.beacon.dto.HeartBeatEvent;
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
import io.entgra.server.bootup.heartbeat.beacon.dto.ServerContext;
import io.entgra.server.bootup.heartbeat.beacon.internal.HeartBeatBeaconDataHolder;
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
public class HeartBeatManagementServiceImpl implements HeartBeatManagementService {
@Override
public int getActiveServerCount() throws HeartBeatManagementException {
HeartBeatDAO heartBeatDAO;
int activeServerCount = -1;
try {
HeartBeatBeaconDAOFactory.openConnection();
heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
int timeOutIntervalInSeconds = HeartBeatBeaconConfig.getInstance().getServerTimeOutIntervalInSeconds();
activeServerCount = heartBeatDAO.getActiveServerCount(timeOutIntervalInSeconds);
} catch (SQLException e) {
String msg = "Error occurred while opening a connection to the underlying data source";
throw new HeartBeatManagementException(msg, e);
} catch (HeartBeatDAOException e) {
String msg = "Error Occured while retrieving active server count.";
throw new HeartBeatManagementException(msg, e);
} finally {
HeartBeatBeaconDAOFactory.closeConnection();
}
return activeServerCount;
}
@Override
public int getServerLocalHashIndex() throws HeartBeatManagementException {
public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException {
HeartBeatDAO heartBeatDAO;
int hashIndex = -1;
ServerContext localServerCtx = null;
ServerCtxInfo serverCtxInfo = null;
try {
HeartBeatBeaconDAOFactory.openConnection();
heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
int timeOutIntervalInSeconds = HeartBeatBeaconConfig.getInstance().getServerTimeOutIntervalInSeconds();
String localServerUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
List<ServerContext> serverCtxList = heartBeatDAO.getActiveServerDetails(timeOutIntervalInSeconds);
for(ServerContext ctx : serverCtxList){
if(ctx.getUuid() == localServerUUID){
localServerCtx = ctx;
break;
Map<String, ServerContext> serverCtxMap = heartBeatDAO.getActiveServerDetails(timeOutIntervalInSeconds);
if(!serverCtxMap.isEmpty()) {
localServerCtx = serverCtxMap.get(localServerUUID);
if (localServerCtx != null) {
hashIndex = localServerCtx.getIndex();
serverCtxInfo = new ServerCtxInfo(serverCtxMap.size(), hashIndex);
}
}
if(localServerCtx != null){
hashIndex = localServerCtx.getIndex();
}
} catch (SQLException e) {
String msg = "Error occurred while opening a connection to the underlying data source";
throw new HeartBeatManagementException(msg, e);
} catch (HeartBeatDAOException e) {
String msg = "Error Occured while retrieving active server count.";
String msg = "Error occurred while retrieving active server count.";
throw new HeartBeatManagementException(msg, e);
} finally {
HeartBeatBeaconDAOFactory.closeConnection();
}
return hashIndex;
return serverCtxInfo;
}
@Override

@ -35,6 +35,7 @@
</parent>
<modules>
<module>components/task-allocation</module>
<module>components/device-mgt</module>
<module>components/device-mgt-extensions</module>
<module>components/identity-extensions</module>
@ -45,7 +46,6 @@
<module>components/webapp-authenticator-framework</module>
<module>components/email-sender</module>
<module>components/ui-request-interceptor</module>
<module>components/task-allocation</module>
<module>features/device-mgt</module>
<module>features/apimgt-extensions</module>
<module>features/application-mgt</module>
@ -377,6 +377,11 @@
<type>zip</type>
<version>${carbon.device.mgt.version}</version>
</dependency>
<dependency>
<groupId>org.wso2.carbon.devicemgt</groupId>
<artifactId>io.entgra.server.bootup.heartbeat.beacon</artifactId>
<version>${carbon.device.mgt.version}</version>
</dependency>
<!-- Device Management dependencies -->
<!-- Governance dependencies -->

Loading…
Cancel
Save