Merge pull request 'developed send the existing emails' (#1) from develop into master

Reviewed-on: #1
pull/7/head
commit d3cf7ad335

@ -8,8 +8,13 @@ import java.util.PriorityQueue;
public interface MailDAO { public interface MailDAO {
void addMail(MailModel mailModel); void addMail(MailModel mailModel);
PriorityQueue<MailModel> getMailwithHighPriority();
// List<MailModel> insertInPriorityQueue();
MailModel getMailDetails(Integer mail_id); MailModel getMailDetails(Integer mail_id);
void addToSentMail(Integer mail_id); void addToSentMail(Integer mail_id);
List<MailModel> getUnsentMessages();
} }

@ -11,14 +11,14 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.sql.*; import java.sql.*;
import java.time.LocalDate; import java.time.LocalDate;
import java.util.Comparator; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.PriorityQueue;
import java.util.logging.Logger; import java.util.logging.Logger;
@Service @Service
public class MailDAOImpl implements MailDAO{ public class MailDAOImpl implements MailDAO{
Logger logger = Logger.getLogger(String.valueOf(MailDAOImpl.class)); Logger logger = Logger.getLogger(String.valueOf(MailDAOImpl.class));
private Connection getConnection() throws SQLException { private Connection getConnection() throws SQLException {
@ -54,40 +54,62 @@ public void addMail(MailModel mailModel){
} }
} }
//@Override
//public List<MailModel> insertInPriorityQueue(){
// logger.info("came into the function");
//
//
// try{
// Connection conn = this.getConnection();
//
// try (PreparedStatement stmt = conn.prepareStatement(sql);
// ResultSet rs = stmt.executeQuery()) {
// while (rs.next()) {
// int priority = rs.getInt("priority");
// int mail_id = rs.getInt("email_id");
// priorityQueue.add(new MailModel(mail_id,null,null,priority,null,null,null,null,null,0));
// }
//
//// MailModel email = priorityQueue.poll();
// //assert email != null;
// for (Object element: priorityQueue){
// System.out.println("element" + element);
// }
// return priorityQueue;
//
// } catch (SQLException e) {
// logger.info("Error processing result set: " + e.getMessage());
// throw new RuntimeException(e);
// }
//
//
// } catch (Exception e) {
// logger.info("Error in preparing statement" + e);
// throw new RuntimeException(e);
// }
//
//}
@Override @Override
public PriorityQueue<MailModel> getMailwithHighPriority(){ public List<MailModel> getUnsentMessages(){
logger.info("came into the function"); String sql = "SELECT * FROM email WHERE email_id NOT IN (SELECT email_id FROM sentEmail)";
String sql = "SELECT email_id,priority FROM email WHERE email_id NOT IN (SELECT email_id FROM sentEmail)";
PriorityQueue<MailModel> priorityQueue = new PriorityQueue<>(Comparator.comparingInt(MailModel::getPriority));
try{ List<MailModel> unsentMails = new ArrayList<>();
Connection conn = this.getConnection();
try (PreparedStatement stmt = conn.prepareStatement(sql); try(Connection connection = this.getConnection();
ResultSet rs = stmt.executeQuery()) { PreparedStatement statement = connection.prepareStatement(sql);
ResultSet rs = statement.executeQuery()) {
while (rs.next()){ while (rs.next()){
int priority = rs.getInt("priority"); MailModel mailModel = new MailModel();
int mail_id = rs.getInt("email_id"); mailModel.setEmailId(rs.getInt("email_id"));
priorityQueue.add(new MailModel(mail_id,null,null,priority,null,null,null,null,null)); mailModel.setPriority(rs.getInt("priority"));
} mailModel.setExpiry_at(rs.getDate("expiry_at"));
unsentMails.add(mailModel);
// MailModel email = priorityQueue.poll();
//assert email != null;
for (Object element: priorityQueue){
System.out.println("element" + element);
} }
return priorityQueue;
} catch (SQLException e) { } catch (SQLException e) {
logger.info("Error processing result set: " + e.getMessage());
throw new RuntimeException(e);
}
} catch (Exception e) {
logger.info("Error in preparing statement" + e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return unsentMails;
} }
@ -102,6 +124,7 @@ public MailModel getMailDetails(Integer mail_id){
stmt.setInt(1,mail_id); stmt.setInt(1,mail_id);
try(ResultSet rs = stmt.executeQuery()) { try(ResultSet rs = stmt.executeQuery()) {
while (rs.next()){ while (rs.next()){
logger.info(rs.getString("email_address"));
mailModel.setEmailId(rs.getInt("email_id")); mailModel.setEmailId(rs.getInt("email_id"));
mailModel.setEmailAddress(rs.getString("email_address")); mailModel.setEmailAddress(rs.getString("email_address"));

@ -26,6 +26,8 @@ public class MailModel {
private File file; private File file;
private String filename; private String filename;
private java.sql.Date expiry_at; private java.sql.Date expiry_at;
private long insertionOrder;
@Setter @Setter

@ -1,13 +1,23 @@
package entgra.mailsender; package entgra.mailsender;
import entgra.mailsender.Service.MailService;
import entgra.mailsender.Service.MailServiceImpl;
import jakarta.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication @SpringBootApplication
public class MailsenderApplication { public class MailsenderApplication {
@Autowired
private MailService mailService;
public static void main(String[] args) { public static void main(String[] args) {
SpringApplication.run(MailsenderApplication.class, args); SpringApplication.run(MailsenderApplication.class, args);
} }
@PostConstruct
public void init(){
mailService.syncMailWithDB();
}
} }

@ -0,0 +1,12 @@
package entgra.mailsender.Service;
import entgra.mailsender.DTO.MailModel;
import java.util.List;
public interface MailQueueService {
void enqueMails(List<MailModel> mailModelList);
MailModel getHighPriorityMail();
}

@ -0,0 +1,79 @@
package entgra.mailsender.Service;
import entgra.mailsender.DAO.MailDAO;
import entgra.mailsender.DTO.MailModel;
import entgra.mailsender.util.PriorityQueueHolder;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
@Service
@RequiredArgsConstructor
public class MailQueueServiceImpl implements MailQueueService{
Logger logger = Logger.getLogger(String.valueOf(MailQueueServiceImpl.class));
@Autowired
MailDAO mailDAO;
// private volatile Thread
// processingThread;
// private volatile boolean isProcessingMails;
// private final MailDAO mailDAO;
@Override
public void enqueMails(List<MailModel> mailModelList){
logger.info("came here");
BlockingQueue<MailModel> priorityQueue = PriorityQueueHolder.getInstance().getPriorityQueue();
AtomicLong insertionOrderCounter = PriorityQueueHolder.getInstance().getInsertionOrderCounter();
Set<MailModel> uniqueMails = PriorityQueueHolder.getInstance().getUniqueMails();
for (MailModel mailModel : mailModelList){
if (!PriorityQueueHolder.getInstance().getUniqueMails().contains(mailModel)){
mailModel.setInsertionOrder(insertionOrderCounter.getAndIncrement());
priorityQueue.add(mailModel);
uniqueMails.add(mailModel);
}
}
// assert priorityQueue.peek() != null;
// logger.info(String.valueOf(priorityQueue.peek().getEmailId()));
// Integer id = priorityQueue.peek().getEmailId();
// logger.info(mailDAO.getMailDetails(id).getEmailAddress());
// logger.info((Supplier<String>) uniqueMails);
}
public MailModel getHighPriorityMail(){
BlockingQueue<MailModel> priorityQueue = PriorityQueueHolder.getInstance().getPriorityQueue();
Set<MailModel> uniqueMails = PriorityQueueHolder.getInstance().getUniqueMails();
if (PriorityQueueHolder.getInstance().getPriorityQueue().peek() != null){
MailModel mailModel = priorityQueue.poll();
uniqueMails.remove(mailModel);
//logger.info(String.valueOf(mailModel.getEmailId()));
return mailModel;
}
return null;
}
// public void clearOnServerShutdown(){
// BlockingQueue<MailModel> priorityQueue = PriorityQueueHolder.getInstance().getPriorityQueue();
// if (processingThread != null){
// while (isProcessingMails){
// try {
// Thread.sleep(1000);
// }catch (InterruptedException e){
// Thread.currentThread().interrupt();
// logger.info("Error waiting for the processing cycle to complete" + e.getMessage());
// }
// isProcessingMails = priorityQueue.peek() != null;
// }
// processingThread.interrupt();
// }
// logger.info("Server shutdown");
// }
}

@ -12,6 +12,8 @@ public interface MailService {
void sendEmail(MailModel emailModel) throws MessagingException, IOException; void sendEmail(MailModel emailModel) throws MessagingException, IOException;
boolean isValidEmailAddress(String email); boolean isValidEmailAddress(String email);
void syncMailWithDB();
} }

@ -2,13 +2,14 @@ package entgra.mailsender.Service;
import entgra.mailsender.DAO.MailDAO; import entgra.mailsender.DAO.MailDAO;
import entgra.mailsender.DTO.MailModel; import entgra.mailsender.DTO.MailModel;
import entgra.mailsender.util.PriorityQueueHolder;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mail.javamail.JavaMailSender; import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper; import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.*;
import java.util.PriorityQueue; import java.util.function.Supplier;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -21,9 +22,12 @@ public class MailServiceImpl implements MailService {
@Autowired @Autowired
private MailDAO mailDAO; private MailDAO mailDAO;
@Autowired
private MailQueueService mailQueueService;
public void sendEmail(MailModel emailModel){ public void sendEmail(MailModel emailModel){
logger.warning("email address : "+emailModel.getEmailAddress()); logger.warning("email address : "+emailModel.getEmailAddress());
if (isValidEmailAddress(emailModel.getEmailAddress())){ if (isValidEmailAddress(emailModel.getEmailAddress())){
throw new RuntimeException("Invalid Email address 161616161"); throw new RuntimeException("Invalid Email address 161616161");
@ -31,18 +35,42 @@ public class MailServiceImpl implements MailService {
mailDAO.addMail(emailModel); // save the mail details in the database mailDAO.addMail(emailModel); // save the mail details in the database
//here we need to check the unsent mail details //here we need to check the unsent mail details
PriorityQueue<MailModel> mailwithHighPriority = mailDAO.getMailwithHighPriority(); //PriorityQueue<MailModel> mailwithHighPriority = mailDAO.getMailwithHighPriority();
MailModel mailModel = mailwithHighPriority.poll(); // MailModel mailModel = mailQueueService.getHighPriorityMail();
assert mailModel != null; // assert mailModel != null;
Integer mail_id = mailModel.getEmailId(); // Integer mail_id = Integer.valueOf(mailModel.getEmailAddress());
// logger.info(mailModel.getEmailAddress());
// MailModel updatedModel = mailDAO.getMailDetails(mail_id);
// logger.info("mail id" + updatedModel.getEmailAddress());
MailModel updatedModel = mailDAO.getMailDetails(mail_id);
logger.info(updatedModel.getEmailAddress());
}
public boolean isValidEmailAddress(String email){
String regex = "^[a-zA-Z0-9_+&*-]+(?:\\.[a-zA-Z0-9_+&*-]+)*@(?:[a-zA-Z0-9-]+\\.)+[a-zA-Z]{2,7}$";
return !email.matches(regex);
}
public void syncMailWithDB(){
try {
List<MailModel> mailModels = mailDAO.getUnsentMessages();
if (!mailModels.isEmpty()){
mailQueueService.enqueMails(mailModels);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
while(PriorityQueueHolder.getInstance().getPriorityQueue().peek() != null) {
MailModel prioritizedMail = mailQueueService.getHighPriorityMail();
MailModel updatedModel = mailDAO.getMailDetails(prioritizedMail.getEmailId());
logger.info("updated mail" + updatedModel.getEmailAddress());
javaMailSender.send(mimeMessage -> { javaMailSender.send(mimeMessage -> {
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage, true); MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage, true);
@ -73,11 +101,9 @@ public class MailServiceImpl implements MailService {
mailDAO.addToSentMail(updatedModel.getEmailId()); mailDAO.addToSentMail(updatedModel.getEmailId());
});
} }
public boolean isValidEmailAddress(String email){ );
String regex = "^[a-zA-Z0-9_+&*-]+(?:\\.[a-zA-Z0-9_+&*-]+)*@(?:[a-zA-Z0-9-]+\\.)+[a-zA-Z]{2,7}$"; }
return !email.matches(regex);
} }

@ -0,0 +1,31 @@
package entgra.mailsender.util;
import entgra.mailsender.DTO.MailModel;
import lombok.Getter;
import java.util.Comparator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
public class PriorityQueueHolder {
// Singleton instance
@Getter
private static final PriorityQueueHolder instance = new PriorityQueueHolder();
@Getter
private final AtomicLong insertionOrderCounter = new AtomicLong(1);
// PriorityBlockingQueue is thread-safe and supports ordering elements based on their natural order or a custom comparator.
@Getter
private final BlockingQueue<MailModel> priorityQueue = new PriorityBlockingQueue<>(100, Comparator
.comparingLong(MailModel::getPriority)
.thenComparing(MailModel::getExpiry_at)
.thenComparing(MailModel::getInsertionOrder));
@Getter
private final Set<MailModel> uniqueMails = ConcurrentHashMap.newKeySet();
}
Loading…
Cancel
Save