|
|
|
@ -49,7 +49,6 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String recordServerCtx(ServerContext ctx) throws HeartBeatDAOException {
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
|
|
String uuid = null;
|
|
|
|
|
try {
|
|
|
|
|
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
|
|
|
|
@ -57,167 +56,167 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
|
|
|
|
|
|
|
|
|
|
String sql;
|
|
|
|
|
sql = "INSERT INTO SERVER_HEART_BEAT_EVENTS(HOST_NAME, UUID, SERVER_PORT) VALUES (?, ?, ?)";
|
|
|
|
|
stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
|
|
|
|
|
stmt.setString(1, ctx.getHostName());
|
|
|
|
|
stmt.setString(2, serverUUID);
|
|
|
|
|
stmt.setInt(3, ctx.getCarbonServerPort());
|
|
|
|
|
|
|
|
|
|
if(stmt.executeUpdate() > 0){
|
|
|
|
|
uuid = serverUUID;
|
|
|
|
|
try (PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
|
|
|
|
|
stmt.setString(1, ctx.getHostName());
|
|
|
|
|
stmt.setString(2, serverUUID);
|
|
|
|
|
stmt.setInt(3, ctx.getCarbonServerPort());
|
|
|
|
|
|
|
|
|
|
if (stmt.executeUpdate() > 0) {
|
|
|
|
|
uuid = serverUUID;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new HeartBeatDAOException("Error occurred while persisting server context for : '" +
|
|
|
|
|
"port '" + ctx.getCarbonServerPort() + "' " +
|
|
|
|
|
"hostname : '" + ctx.getHostName() + "' ", e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOUtil.cleanupResources(stmt, null);
|
|
|
|
|
String msg = "Error occurred while persisting server context for : '" +
|
|
|
|
|
"port '" + ctx.getCarbonServerPort() + "' " +
|
|
|
|
|
"hostname : '" + ctx.getHostName() + "'";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatDAOException(msg, e);
|
|
|
|
|
}
|
|
|
|
|
return uuid;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean recordElectedCandidate(String serverUUID) throws HeartBeatDAOException {
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
|
|
try {
|
|
|
|
|
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
|
|
|
|
|
String sql;
|
|
|
|
|
sql = "INSERT INTO ELECTED_LEADER_META_INFO(UUID) VALUES (?)";
|
|
|
|
|
stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
|
|
|
|
|
stmt.setString(1, serverUUID);
|
|
|
|
|
|
|
|
|
|
return stmt.executeUpdate() > 0;
|
|
|
|
|
try (PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
|
|
|
|
|
stmt.setString(1, serverUUID);
|
|
|
|
|
return stmt.executeUpdate() > 0;
|
|
|
|
|
}
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new HeartBeatDAOException("Error occurred while persisting UUID of chosen " +
|
|
|
|
|
"elected dynamic task execution candidate : " + serverUUID , e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOUtil.cleanupResources(stmt, null);
|
|
|
|
|
String msg = "Error occurred while persisting UUID of chosen " +
|
|
|
|
|
"elected dynamic task execution candidate : " + serverUUID;
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatDAOException(msg, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void purgeCandidates() throws HeartBeatDAOException {
|
|
|
|
|
Statement stmt = null;
|
|
|
|
|
try {
|
|
|
|
|
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
|
|
|
|
|
conn.setAutoCommit(false);
|
|
|
|
|
String sql = "TRUNCATE TABLE ELECTED_LEADER_META_INFO";
|
|
|
|
|
stmt = conn.createStatement();
|
|
|
|
|
stmt.execute(sql);
|
|
|
|
|
conn.commit();
|
|
|
|
|
try (Statement stmt = conn.createStatement()) {
|
|
|
|
|
stmt.execute(sql);
|
|
|
|
|
conn.commit();
|
|
|
|
|
}
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new HeartBeatDAOException("Error occurred while truncating ELECTED_LEADER_META_INFO table.", e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOUtil.cleanupResources(stmt, null);
|
|
|
|
|
String msg = "Error occurred while truncating ELECTED_LEADER_META_INFO table.";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatDAOException(msg, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public ElectedCandidate retrieveCandidate() throws HeartBeatDAOException {
|
|
|
|
|
Statement stmt = null;
|
|
|
|
|
ResultSet resultSet = null;
|
|
|
|
|
ElectedCandidate candidate = null;
|
|
|
|
|
try {
|
|
|
|
|
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
|
|
|
|
|
String sql = "SELECT * from ELECTED_LEADER_META_INFO";
|
|
|
|
|
stmt = conn.createStatement();
|
|
|
|
|
resultSet = stmt.executeQuery(sql);
|
|
|
|
|
while (resultSet.next()){
|
|
|
|
|
candidate = HeartBeatBeaconDAOUtil.populateCandidate(resultSet);
|
|
|
|
|
|
|
|
|
|
try (Statement stmt = conn.createStatement()) {
|
|
|
|
|
try (ResultSet resultSet = stmt.executeQuery(sql)) {
|
|
|
|
|
while (resultSet.next()) {
|
|
|
|
|
candidate = HeartBeatBeaconDAOUtil.populateCandidate(resultSet);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new HeartBeatDAOException("Error occurred while retrieving meta information of elected candidate", e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOUtil.cleanupResources(stmt, resultSet);
|
|
|
|
|
String msg = "Error occurred while retrieving meta information of elected candidate";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatDAOException(msg, e);
|
|
|
|
|
}
|
|
|
|
|
return candidate;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean acknowledgeTask(String uuid, List<String> taskList) throws HeartBeatDAOException {
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
|
|
public boolean acknowledgeTask(String uuid, List<String> taskList)
|
|
|
|
|
throws HeartBeatDAOException {
|
|
|
|
|
try {
|
|
|
|
|
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
|
|
|
|
|
String sql;
|
|
|
|
|
sql = "UPDATE ELECTED_LEADER_META_INFO SET ACKNOWLEDGED_TASK_LIST = ? WHERE UUID = ?";
|
|
|
|
|
stmt = conn.prepareStatement(sql, new String[]{"UUID"});
|
|
|
|
|
stmt.setString(1, String.join(",", taskList));
|
|
|
|
|
stmt.setString(2, uuid);
|
|
|
|
|
|
|
|
|
|
return stmt.executeUpdate() > 0;
|
|
|
|
|
try (PreparedStatement stmt = conn.prepareStatement(sql, new String[]{"UUID"})) {
|
|
|
|
|
stmt.setString(1, String.join(",", taskList));
|
|
|
|
|
stmt.setString(2, uuid);
|
|
|
|
|
return stmt.executeUpdate() > 0;
|
|
|
|
|
}
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new HeartBeatDAOException("Error occurred while updating task list of elected server : '" +
|
|
|
|
|
uuid + "' and task list " + taskList, e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOUtil.cleanupResources(stmt, null);
|
|
|
|
|
String msg = "Error occurred while updating task list of elected server : '" +
|
|
|
|
|
uuid + "' and task list " + taskList;
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatDAOException(msg, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean recordHeatBeat(HeartBeatEvent event) throws HeartBeatDAOException {
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
|
|
try {
|
|
|
|
|
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
|
|
|
|
|
String sql;
|
|
|
|
|
sql = "UPDATE SERVER_HEART_BEAT_EVENTS SET LAST_UPDATED_TIMESTAMP = ? WHERE UUID = ?";
|
|
|
|
|
stmt = conn.prepareStatement(sql, new String[]{"ID"});
|
|
|
|
|
stmt.setTimestamp(1, event.getTime());
|
|
|
|
|
stmt.setString(2, event.getServerUUID());
|
|
|
|
|
|
|
|
|
|
return stmt.executeUpdate() > 0;
|
|
|
|
|
try (PreparedStatement stmt = conn.prepareStatement(sql, new String[]{"ID"})) {
|
|
|
|
|
stmt.setTimestamp(1, event.getTime());
|
|
|
|
|
stmt.setString(2, event.getServerUUID());
|
|
|
|
|
return stmt.executeUpdate() > 0;
|
|
|
|
|
}
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new HeartBeatDAOException("Error occurred while updating heartbeat event against server with UUID : '" +
|
|
|
|
|
event.getServerUUID() + "' and timestamp " + event.getTime(), e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOUtil.cleanupResources(stmt, null);
|
|
|
|
|
String msg = "Error occurred while updating heartbeat event against server with UUID : '" +
|
|
|
|
|
event.getServerUUID() + "' and timestamp " + event.getTime();
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatDAOException(msg, e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean checkUUIDValidity(String uuid) throws HeartBeatDAOException {
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
|
|
ResultSet resultSet = null;
|
|
|
|
|
boolean result = false;
|
|
|
|
|
try {
|
|
|
|
|
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
|
|
|
|
|
String sql = "SELECT ID FROM SERVER_HEART_BEAT_EVENTS WHERE UUID = ?";
|
|
|
|
|
stmt = conn.prepareStatement(sql);
|
|
|
|
|
stmt.setString(1, uuid);
|
|
|
|
|
|
|
|
|
|
resultSet = stmt.executeQuery();
|
|
|
|
|
if(resultSet.next()){
|
|
|
|
|
result = true;
|
|
|
|
|
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
|
|
|
|
|
stmt.setString(1, uuid);
|
|
|
|
|
try (ResultSet resultSet = stmt.executeQuery()) {
|
|
|
|
|
if (resultSet.next()) {
|
|
|
|
|
result = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new HeartBeatDAOException("Error occurred checking existense of UUID" + uuid +
|
|
|
|
|
" amongst heartbeat meta info ", e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOUtil.cleanupResources(stmt, resultSet);
|
|
|
|
|
String msg = "Error occurred checking existense of UUID" + uuid +
|
|
|
|
|
" amongst heartbeat meta info.";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatDAOException(msg, e);
|
|
|
|
|
}
|
|
|
|
|
return result;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public String retrieveExistingServerCtx(ServerContext ctx) throws HeartBeatDAOException {
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
|
|
ResultSet resultSet = null;
|
|
|
|
|
String uuid = null;
|
|
|
|
|
try {
|
|
|
|
|
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
|
|
|
|
|
String sql = "SELECT UUID FROM SERVER_HEART_BEAT_EVENTS WHERE HOST_NAME = ? AND SERVER_PORT = ?";
|
|
|
|
|
stmt = conn.prepareStatement(sql, new String[]{"UUID"});
|
|
|
|
|
stmt.setString(1, ctx.getHostName());
|
|
|
|
|
stmt.setInt(2, ctx.getCarbonServerPort());
|
|
|
|
|
|
|
|
|
|
resultSet = stmt.executeQuery();
|
|
|
|
|
if (resultSet.next()){
|
|
|
|
|
uuid = resultSet.getString("UUID");
|
|
|
|
|
try (PreparedStatement stmt = conn.prepareStatement(sql, new String[]{"UUID"})) {
|
|
|
|
|
stmt.setString(1, ctx.getHostName());
|
|
|
|
|
stmt.setInt(2, ctx.getCarbonServerPort());
|
|
|
|
|
try (ResultSet resultSet = stmt.executeQuery()) {
|
|
|
|
|
if (resultSet.next()) {
|
|
|
|
|
uuid = resultSet.getString("UUID");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new HeartBeatDAOException("Error occurred while retrieving meta information for heart beat event from " +
|
|
|
|
|
"port '" + ctx.getCarbonServerPort() + "' " +
|
|
|
|
|
"hostname : '" + ctx.getHostName() + "' ", e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOUtil.cleanupResources(stmt, resultSet);
|
|
|
|
|
String msg = "Error occurred while retrieving meta information for heart beat event from " +
|
|
|
|
|
"port '" + ctx.getCarbonServerPort() + "' " +
|
|
|
|
|
"hostname : '" + ctx.getHostName() + "'";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatDAOException(msg, e);
|
|
|
|
|
}
|
|
|
|
|
return uuid;
|
|
|
|
|
}
|
|
|
|
@ -225,8 +224,6 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
|
|
|
|
|
@Override
|
|
|
|
|
public Map<String, ServerContext> getActiveServerDetails(int elapsedTimeInSeconds)
|
|
|
|
|
throws HeartBeatDAOException {
|
|
|
|
|
PreparedStatement stmt = null;
|
|
|
|
|
ResultSet resultSet = null;
|
|
|
|
|
Map<String, ServerContext> ctxList = new HashMap<>();
|
|
|
|
|
try {
|
|
|
|
|
Connection conn = HeartBeatBeaconDAOFactory.getConnection();
|
|
|
|
@ -234,17 +231,19 @@ public class GenericHeartBeatDAOImpl implements HeartBeatDAO {
|
|
|
|
|
"SERVER_HEART_BEAT_EVENTS, (SELECT @row_number:=-1) AS TEMP " +
|
|
|
|
|
"WHERE LAST_UPDATED_TIMESTAMP > ? " +
|
|
|
|
|
"ORDER BY UUID";
|
|
|
|
|
stmt = conn.prepareStatement(sql);
|
|
|
|
|
stmt.setTimestamp(1, new Timestamp(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)));
|
|
|
|
|
resultSet = stmt.executeQuery();
|
|
|
|
|
while (resultSet.next()) {
|
|
|
|
|
ctxList.put(resultSet.getString("UUID"), HeartBeatBeaconDAOUtil.populateContext(resultSet));
|
|
|
|
|
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
|
|
|
|
|
stmt.setTimestamp(1, new Timestamp(System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(elapsedTimeInSeconds)));
|
|
|
|
|
try (ResultSet resultSet = stmt.executeQuery()) {
|
|
|
|
|
while (resultSet.next()) {
|
|
|
|
|
ctxList.put(resultSet.getString("UUID"), HeartBeatBeaconDAOUtil.populateContext(resultSet));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (SQLException e) {
|
|
|
|
|
throw new HeartBeatDAOException("Error occurred while retrieving acting server count with " +
|
|
|
|
|
"heartbeat updates within " + elapsedTimeInSeconds + " seconds.", e);
|
|
|
|
|
} finally {
|
|
|
|
|
HeartBeatBeaconDAOUtil.cleanupResources(stmt, resultSet);
|
|
|
|
|
String msg = "Error occurred while retrieving acting server count with " +
|
|
|
|
|
"heartbeat updates within " + elapsedTimeInSeconds + " seconds.";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
throw new HeartBeatDAOException(msg, e);
|
|
|
|
|
}
|
|
|
|
|
return ctxList;
|
|
|
|
|
}
|
|
|
|
|