Fix db connection pool exhaustion due to too many queries

appsubscriptiontask
Charitha Goonetilleke 10 months ago
parent dd03a073b2
commit 8f1781f4b6

@ -22,7 +22,6 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconC
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconUtils;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.datasource.DataSourceConfig;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.InvalidConfigurationStateException;
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifier;
import org.w3c.dom.Document;
import org.wso2.carbon.utils.CarbonUtils;

@ -187,7 +187,7 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
}
}
} catch (SQLException e) {
String msg = "Error occurred checking existense of UUID" + uuid +
String msg = "Error occurred checking existence of UUID" + uuid +
" amongst heartbeat meta info.";
log.error(msg, e);
throw new HeartBeatDAOException(msg, e);

@ -65,7 +65,7 @@ public class HeartBeatExecutor implements ServerStartupObserver {
Executors.newSingleThreadScheduledExecutor();
if (CONFIG == null) {
String msg = "Error while initiating schedule taks for recording heartbeats.";
String msg = "Error while initiating schedule tasks for recording heartbeats.";
log.error(msg);
throw new HeartBeatBeaconConfigurationException(msg);
}
@ -78,15 +78,15 @@ public class HeartBeatExecutor implements ServerStartupObserver {
}
int timeOutIntervalInSeconds = CONFIG.getServerTimeOutIntervalInSeconds();
int timeSkew = CONFIG.getTimeSkew();
int cumilativeTimeOut = timeOutIntervalInSeconds + timeSkew;
int cumulativeTimeOut = timeOutIntervalInSeconds + timeSkew;
final String designatedUUID = uuid;
HeartBeatBeaconDataHolder.getInstance().setLocalServerUUID(designatedUUID);
Runnable periodicTask = new Runnable() {
public void run() {
try {
recordHeartBeat(designatedUUID);
electDynamicTaskExecutionCandidate(cumilativeTimeOut);
notifyClusterFormationChanged(cumilativeTimeOut);
electDynamicTaskExecutionCandidate(cumulativeTimeOut);
notifyClusterFormationChanged(cumulativeTimeOut);
} catch (Exception e) {
log.error("Error while executing record heart beat task. This will result in schedule operation malfunction.", e);
}
@ -97,7 +97,7 @@ public class HeartBeatExecutor implements ServerStartupObserver {
CONFIG.getNotifierFrequency() != 0 ? CONFIG.getNotifierFrequency() : DEFAULT__NOTIFIER_INTERVAL,
TimeUnit.SECONDS);
} catch (HeartBeatManagementException e) {
String msg = "Error occured while updating initial server context.";
String msg = "Error occurred while updating initial server context.";
log.error(msg);
throw new HeartBeatBeaconConfigurationException(msg, e);
} catch (IOException e) {
@ -111,13 +111,14 @@ public class HeartBeatExecutor implements ServerStartupObserver {
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().recordHeartBeat(new HeartBeatEvent(uuid));
}
static void electDynamicTaskExecutionCandidate(int cumilativeTimeOut)
static void electDynamicTaskExecutionCandidate(int cumulativeTimeOut)
throws HeartBeatManagementException {
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().electCandidate(cumilativeTimeOut);
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().electCandidate(cumulativeTimeOut);
}
static void notifyClusterFormationChanged(int cumilativeTimeOut) throws HeartBeatManagementException {
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().notifyClusterFormationChanged(cumilativeTimeOut);
static void notifyClusterFormationChanged(int cumulativeTimeOut) throws HeartBeatManagementException {
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService()
.notifyClusterFormationChanged(cumulativeTimeOut);
}
}

@ -50,6 +50,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
private static int lastActiveCount = -1;
private static int lastHashIndex = -1;
private static volatile boolean isQualified = false;
public HeartBeatManagementServiceImpl() {
this.heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
@ -58,17 +59,17 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
@Override
public ServerCtxInfo getServerCtxInfo() throws HeartBeatManagementException {
int hashIndex = -1;
ServerContext localServerCtx = null;
int hashIndex;
ServerContext localServerCtx;
ServerCtxInfo serverCtxInfo = null;
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
HeartBeatBeaconDAOFactory.openConnection();
int timeOutIntervalInSeconds = HeartBeatBeaconConfig.getInstance().getServerTimeOutIntervalInSeconds();
int timeSkew = HeartBeatBeaconConfig.getInstance().getTimeSkew();
int cumilativeTimeOut = timeOutIntervalInSeconds + timeSkew;
int cumulativeTimeOut = timeOutIntervalInSeconds + timeSkew;
String localServerUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
Map<String, ServerContext> serverCtxMap = heartBeatDAO.getActiveServerDetails(cumilativeTimeOut);
Map<String, ServerContext> serverCtxMap = heartBeatDAO.getActiveServerDetails(cumulativeTimeOut);
if (!serverCtxMap.isEmpty()) {
localServerCtx = serverCtxMap.get(localServerUUID);
if (localServerCtx != null) {
@ -97,7 +98,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
@Override
public boolean isTaskPartitioningEnabled() throws HeartBeatManagementException {
boolean enabled = false;
boolean enabled;
if (HeartBeatBeaconConfig.getInstance() != null) {
enabled = HeartBeatBeaconConfig.getInstance().isEnabled();
} else {
@ -111,7 +112,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
@Override
public String updateServerContext(ServerContext ctx) throws HeartBeatManagementException {
String uuid = null;
String uuid;
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
HeartBeatBeaconDAOFactory.beginTransaction();
@ -121,12 +122,13 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
HeartBeatBeaconDAOFactory.commitTransaction();
}
} catch (HeartBeatDAOException e) {
String msg = "Error Occured while retrieving server context.";
String msg = "Error Occurred while retrieving server context.";
log.error(msg, e);
throw new HeartBeatManagementException(msg, e);
} catch (TransactionManagementException e) {
HeartBeatBeaconDAOFactory.rollbackTransaction();
String msg = "Error occurred while updating server context. Issue in opening a connection to the underlying data source";
String msg = "Error occurred while updating server context. Issue in opening a connection to the " +
"underlying data source";
log.error(msg, e);
throw new HeartBeatManagementException(msg, e);
} finally {
@ -141,35 +143,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
}
@Override
public boolean isQualifiedToExecuteTask() throws HeartBeatManagementException {
boolean isQualified = false;
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
try {
String localServerUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
HeartBeatBeaconDAOFactory.openConnection();
ElectedCandidate candidate = heartBeatDAO.retrieveCandidate();
if (candidate != null && candidate.getServerUUID().equalsIgnoreCase(localServerUUID)) {
isQualified = true;
if (log.isDebugEnabled()) {
log.debug("Node : " + localServerUUID + " is qualified to execute randomly assigned task.");
}
}
} catch (HeartBeatDAOException e) {
String msg = "Error occurred while checking if server is qualified to execute randomly designated task.";
log.error(msg, e);
throw new HeartBeatManagementException(msg, e);
} catch (SQLException e) {
String msg = "Error occurred while opening a connection to the underlying data source";
log.error(msg, e);
throw new HeartBeatManagementException(msg, e);
} finally {
HeartBeatBeaconDAOFactory.closeConnection();
}
} else {
String msg = "Heart Beat Configuration Disabled. Error occurred while checking if server is qualified to execute randomly designated task.";
log.error(msg);
throw new HeartBeatManagementException(msg);
}
public boolean isQualifiedToExecuteTask() {
return isQualified;
}
@ -229,7 +203,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
if (presentCandidate != null) {
//if candidate is older than stipulated elapsed-time, purge and re-elect
if (presentCandidate.getTimeOfElection().before(new Timestamp(System.currentTimeMillis()
- TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)))) {
- TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)))) {
heartBeatDAO.purgeCandidates();
electCandidate(servers);
}
@ -240,13 +214,24 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
}
HeartBeatBeaconDAOFactory.commitTransaction();
}
ElectedCandidate candidate = heartBeatDAO.retrieveCandidate();
String localServerUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
if (candidate != null && candidate.getServerUUID().equalsIgnoreCase(localServerUUID)) {
isQualified = true;
if (log.isDebugEnabled()) {
log.debug("Node : " + localServerUUID + " is qualified to execute randomly assigned task.");
}
} else {
isQualified = false;
}
} catch (HeartBeatDAOException e) {
String msg = "Error occurred while electing candidate for dynamic task execution.";
log.error(msg, e);
throw new HeartBeatManagementException(msg, e);
} catch (TransactionManagementException e) {
HeartBeatBeaconDAOFactory.rollbackTransaction();
String msg = "Error occurred while electing candidate for dynamic task execution. Issue in opening a connection to the underlying data source";
String msg = "Error occurred while electing candidate for dynamic task execution. " +
"Issue in opening a connection to the underlying data source";
log.error(msg, e);
throw new HeartBeatManagementException(msg, e);
} finally {
@ -258,6 +243,7 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
throw new HeartBeatManagementException(msg);
}
}
@Override
public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException {
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
@ -278,7 +264,8 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
lastHashIndex = serverContext.getIndex();
lastActiveCount = servers.size();
ClusterFormationChangedNotifierRepository repository = HeartBeatBeaconDataHolder.getInstance().getClusterFormationChangedNotifierRepository();
ClusterFormationChangedNotifierRepository repository = HeartBeatBeaconDataHolder.getInstance()
.getClusterFormationChangedNotifierRepository();
Map<String, ClusterFormationChangedNotifier> notifiers = repository.getNotifiers();
for (String type : notifiers.keySet()) {
ClusterFormationChangedNotifier notifier = notifiers.get(type);
@ -372,8 +359,8 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
HeartBeatBeaconDAOFactory.openConnection();
int timeOutIntervalInSeconds = HeartBeatBeaconConfig.getInstance().getServerTimeOutIntervalInSeconds();
int timeSkew = HeartBeatBeaconConfig.getInstance().getTimeSkew();
int cumilativeTimeOut = timeOutIntervalInSeconds + timeSkew;
Map<String, ServerContext> serverCtxMap = heartBeatDAO.getActiveServerDetails(cumilativeTimeOut);
int cumulativeTimeOut = timeOutIntervalInSeconds + timeSkew;
Map<String, ServerContext> serverCtxMap = heartBeatDAO.getActiveServerDetails(cumulativeTimeOut);
for (String uuid : serverCtxMap.keySet()) {
ServerContext serverContext = serverCtxMap.get(uuid);
activeServers.put(serverContext.getIndex(), serverContext);
@ -396,4 +383,5 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
}
return activeServers;
}
}

@ -19,20 +19,19 @@
package io.entgra.device.mgt.core.subtype.mgt.cache;
import com.google.common.cache.CacheLoader;
import io.entgra.device.mgt.core.subtype.mgt.dto.DeviceSubTypeCacheKey;
import io.entgra.device.mgt.core.subtype.mgt.dao.DeviceSubTypeDAO;
import io.entgra.device.mgt.core.subtype.mgt.dao.DeviceSubTypeDAOFactory;
import io.entgra.device.mgt.core.subtype.mgt.dao.util.ConnectionManagerUtil;
import io.entgra.device.mgt.core.subtype.mgt.dto.DeviceSubType;
import io.entgra.device.mgt.core.subtype.mgt.dto.DeviceSubTypeCacheKey;
import io.entgra.device.mgt.core.subtype.mgt.exception.DBConnectionException;
import io.entgra.device.mgt.core.subtype.mgt.exception.SubTypeMgtDAOException;
import io.entgra.device.mgt.core.subtype.mgt.exception.SubTypeMgtPluginException;
import io.entgra.device.mgt.core.subtype.mgt.util.DeviceSubTypeMgtUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class GetDeviceSubTypeCacheLoader extends CacheLoader<String, DeviceSubType> {
public class GetDeviceSubTypeCacheLoader extends CacheLoader<DeviceSubTypeCacheKey, DeviceSubType> {
private static final Log log = LogFactory.getLog(GetDeviceSubTypeCacheLoader.class);
@ -43,14 +42,13 @@ public class GetDeviceSubTypeCacheLoader extends CacheLoader<String, DeviceSubTy
}
@Override
public DeviceSubType load(String key) throws SubTypeMgtPluginException {
DeviceSubTypeCacheKey deviceSubTypeCacheKey = DeviceSubTypeMgtUtil.getDeviceSubTypeCacheKey(key);
public DeviceSubType load(DeviceSubTypeCacheKey deviceSubTypeCacheKey) throws SubTypeMgtPluginException {
int tenantId = deviceSubTypeCacheKey.getTenantId();
String subTypeId = deviceSubTypeCacheKey.getSubTypeId();
String deviceType = deviceSubTypeCacheKey.getDeviceType();
if (log.isTraceEnabled()) {
log.trace("Loading Device subtype for " + deviceType + " subtype & subtype Id : " + subTypeId);
if (log.isDebugEnabled()) {
log.debug("Loading Device subtype for " + deviceType + " subtype & subtype Id : " + subTypeId);
}
try {
ConnectionManagerUtil.openDBConnection();

@ -19,9 +19,10 @@
package io.entgra.device.mgt.core.subtype.mgt.dto;
public class DeviceSubTypeCacheKey {
int tenantId;
String subTypeId;
String deviceType;
private int tenantId;
private String subTypeId;
private String deviceType;
public int getTenantId() {
return tenantId;
@ -46,4 +47,15 @@ public class DeviceSubTypeCacheKey {
public void setDeviceType(String deviceType) {
this.deviceType = deviceType;
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public String toString() {
return tenantId + "|" + subTypeId + "|" + deviceType;
}
}

@ -22,6 +22,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.entgra.device.mgt.core.subtype.mgt.cache.GetDeviceSubTypeCacheLoader;
import io.entgra.device.mgt.core.subtype.mgt.dto.DeviceSubTypeCacheKey;
import io.entgra.device.mgt.core.subtype.mgt.exception.BadRequestException;
import io.entgra.device.mgt.core.subtype.mgt.exception.DBConnectionException;
import io.entgra.device.mgt.core.subtype.mgt.exception.SubTypeMgtDAOException;
@ -42,8 +43,10 @@ import java.util.concurrent.TimeUnit;
public class DeviceSubTypeServiceImpl implements DeviceSubTypeService {
private static final Log log = LogFactory.getLog(DeviceSubTypeServiceImpl.class);
private static final LoadingCache<String, DeviceSubType> deviceSubTypeCache = CacheBuilder.newBuilder()
.expireAfterWrite(15, TimeUnit.MINUTES).build(new GetDeviceSubTypeCacheLoader());
private static final LoadingCache<DeviceSubTypeCacheKey, DeviceSubType> deviceSubTypeCache
= CacheBuilder.newBuilder()
.expireAfterWrite(15, TimeUnit.MINUTES)
.build(new GetDeviceSubTypeCacheLoader());
private final DeviceSubTypeDAO deviceSubTypeDAO;
public DeviceSubTypeServiceImpl() {
@ -52,7 +55,7 @@ public class DeviceSubTypeServiceImpl implements DeviceSubTypeService {
@Override
public boolean addDeviceSubType(DeviceSubType deviceSubType) throws SubTypeMgtPluginException {
String msg = "";
String msg;
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
deviceSubType.setTenantId(tenantId);
@ -72,9 +75,6 @@ public class DeviceSubTypeServiceImpl implements DeviceSubTypeService {
throw new SubTypeMgtPluginException(msg);
}
ConnectionManagerUtil.commitDBTransaction();
String key = DeviceSubTypeMgtUtil.setDeviceSubTypeCacheKey(tenantId, deviceSubType.getSubTypeId(),
deviceSubType.getDeviceType());
deviceSubTypeCache.put(key, deviceSubType);
return true;
} catch (DBConnectionException e) {
msg = "Error occurred while obtaining the database connection to add device subtype for " +
@ -89,6 +89,10 @@ public class DeviceSubTypeServiceImpl implements DeviceSubTypeService {
throw new SubTypeMgtPluginException(msg, e);
} finally {
ConnectionManagerUtil.closeDBConnection();
DeviceSubTypeCacheKey key = DeviceSubTypeMgtUtil.getDeviceSubTypeCacheKey(tenantId,
deviceSubType.getSubTypeId(),
deviceSubType.getDeviceType());
deviceSubTypeCache.refresh(key);
}
}
@ -96,7 +100,7 @@ public class DeviceSubTypeServiceImpl implements DeviceSubTypeService {
public boolean updateDeviceSubType(String subTypeId, int tenantId, String deviceType,
String subTypeName, String typeDefinition)
throws SubTypeMgtPluginException {
String msg = "";
String msg;
DeviceSubType deviceSubTypeOld = getDeviceSubType(subTypeId, tenantId, deviceType);
if (deviceSubTypeOld == null) {
@ -133,7 +137,7 @@ public class DeviceSubTypeServiceImpl implements DeviceSubTypeService {
throw new SubTypeMgtPluginException(msg, e);
} finally {
ConnectionManagerUtil.closeDBConnection();
String key = DeviceSubTypeMgtUtil.setDeviceSubTypeCacheKey(tenantId, subTypeId, deviceType);
DeviceSubTypeCacheKey key = DeviceSubTypeMgtUtil.getDeviceSubTypeCacheKey(tenantId, subTypeId, deviceType);
deviceSubTypeCache.refresh(key);
}
}
@ -142,7 +146,7 @@ public class DeviceSubTypeServiceImpl implements DeviceSubTypeService {
public DeviceSubType getDeviceSubType(String subTypeId, int tenantId, String deviceType)
throws SubTypeMgtPluginException {
try {
String key = DeviceSubTypeMgtUtil.setDeviceSubTypeCacheKey(tenantId, subTypeId, deviceType);
DeviceSubTypeCacheKey key = DeviceSubTypeMgtUtil.getDeviceSubTypeCacheKey(tenantId, subTypeId, deviceType);
return deviceSubTypeCache.get(key);
} catch (CacheLoader.InvalidCacheLoadException e) {
String msg = "Not having any" + deviceType + " subtype for subtype id: " + subTypeId;
@ -179,12 +183,7 @@ public class DeviceSubTypeServiceImpl implements DeviceSubTypeService {
@Override
public int getDeviceSubTypeCount(String deviceType) throws SubTypeMgtPluginException {
try {
int result = deviceSubTypeDAO.getDeviceSubTypeCount(deviceType);
if (result <= 0) {
String msg = "There are no any subtypes for device type: " + deviceType;
log.error(msg);
}
return result;
return deviceSubTypeDAO.getDeviceSubTypeCount(deviceType);
} catch (SubTypeMgtDAOException e) {
String msg = "Error occurred in the database level while retrieving device subtypes count for " + deviceType
+ " subtypes";

@ -19,23 +19,15 @@
package io.entgra.device.mgt.core.subtype.mgt.util;
import io.entgra.device.mgt.core.subtype.mgt.dto.DeviceSubTypeCacheKey;
import io.entgra.device.mgt.core.subtype.mgt.dto.DeviceSubType;
public class DeviceSubTypeMgtUtil {
public static String setDeviceSubTypeCacheKey(int tenantId, String subTypeId, String deviceType) {
return tenantId + "|" + subTypeId + "|" + deviceType.toString();
}
public static DeviceSubTypeCacheKey getDeviceSubTypeCacheKey(String key) {
String[] keys = key.split("\\|");
int tenantId = Integer.parseInt(keys[0]);
String subTypeId = keys[1];
String deviceType = keys[2];
public static DeviceSubTypeCacheKey getDeviceSubTypeCacheKey(int tenantId, String subTypeId, String deviceType) {
DeviceSubTypeCacheKey deviceSubTypesCacheKey = new DeviceSubTypeCacheKey();
deviceSubTypesCacheKey.setTenantId(tenantId);
deviceSubTypesCacheKey.setSubTypeId(subTypeId);
deviceSubTypesCacheKey.setDeviceType(deviceType);
return deviceSubTypesCacheKey;
}
}

Loading…
Cancel
Save