parent
1a6de77a82
commit
a8ce4818f2
After Width: | Height: | Size: 854 KiB |
@ -1,13 +1,23 @@
|
|||||||
package entgra.mailsender;
|
package entgra.mailsender;
|
||||||
|
|
||||||
|
import entgra.mailsender.Service.MailService;
|
||||||
|
import entgra.mailsender.Service.MailServiceImpl;
|
||||||
|
import jakarta.annotation.PostConstruct;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.SpringApplication;
|
import org.springframework.boot.SpringApplication;
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
|
||||||
@SpringBootApplication
|
@SpringBootApplication
|
||||||
public class MailsenderApplication {
|
public class MailsenderApplication {
|
||||||
|
@Autowired
|
||||||
|
private MailService mailService;
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
SpringApplication.run(MailsenderApplication.class, args);
|
SpringApplication.run(MailsenderApplication.class, args);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
public void init(){
|
||||||
|
mailService.syncMailWithDB();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,12 @@
|
|||||||
|
package entgra.mailsender.Service;
|
||||||
|
|
||||||
|
import entgra.mailsender.DTO.MailModel;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public interface MailQueueService {
|
||||||
|
|
||||||
|
void enqueMails(List<MailModel> mailModelList);
|
||||||
|
MailModel getHighPriorityMail();
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,79 @@
|
|||||||
|
package entgra.mailsender.Service;
|
||||||
|
|
||||||
|
import entgra.mailsender.DAO.MailDAO;
|
||||||
|
import entgra.mailsender.DTO.MailModel;
|
||||||
|
import entgra.mailsender.util.PriorityQueueHolder;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class MailQueueServiceImpl implements MailQueueService{
|
||||||
|
Logger logger = Logger.getLogger(String.valueOf(MailQueueServiceImpl.class));
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
MailDAO mailDAO;
|
||||||
|
|
||||||
|
// private volatile Thread
|
||||||
|
// processingThread;
|
||||||
|
// private volatile boolean isProcessingMails;
|
||||||
|
// private final MailDAO mailDAO;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void enqueMails(List<MailModel> mailModelList){
|
||||||
|
logger.info("came here");
|
||||||
|
BlockingQueue<MailModel> priorityQueue = PriorityQueueHolder.getInstance().getPriorityQueue();
|
||||||
|
AtomicLong insertionOrderCounter = PriorityQueueHolder.getInstance().getInsertionOrderCounter();
|
||||||
|
Set<MailModel> uniqueMails = PriorityQueueHolder.getInstance().getUniqueMails();
|
||||||
|
for (MailModel mailModel : mailModelList){
|
||||||
|
if (!PriorityQueueHolder.getInstance().getUniqueMails().contains(mailModel)){
|
||||||
|
mailModel.setInsertionOrder(insertionOrderCounter.getAndIncrement());
|
||||||
|
priorityQueue.add(mailModel);
|
||||||
|
uniqueMails.add(mailModel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// assert priorityQueue.peek() != null;
|
||||||
|
// logger.info(String.valueOf(priorityQueue.peek().getEmailId()));
|
||||||
|
// Integer id = priorityQueue.peek().getEmailId();
|
||||||
|
// logger.info(mailDAO.getMailDetails(id).getEmailAddress());
|
||||||
|
// logger.info((Supplier<String>) uniqueMails);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MailModel getHighPriorityMail(){
|
||||||
|
BlockingQueue<MailModel> priorityQueue = PriorityQueueHolder.getInstance().getPriorityQueue();
|
||||||
|
Set<MailModel> uniqueMails = PriorityQueueHolder.getInstance().getUniqueMails();
|
||||||
|
if (PriorityQueueHolder.getInstance().getPriorityQueue().peek() != null){
|
||||||
|
MailModel mailModel = priorityQueue.poll();
|
||||||
|
uniqueMails.remove(mailModel);
|
||||||
|
//logger.info(String.valueOf(mailModel.getEmailId()));
|
||||||
|
return mailModel;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// public void clearOnServerShutdown(){
|
||||||
|
// BlockingQueue<MailModel> priorityQueue = PriorityQueueHolder.getInstance().getPriorityQueue();
|
||||||
|
// if (processingThread != null){
|
||||||
|
// while (isProcessingMails){
|
||||||
|
// try {
|
||||||
|
// Thread.sleep(1000);
|
||||||
|
// }catch (InterruptedException e){
|
||||||
|
// Thread.currentThread().interrupt();
|
||||||
|
// logger.info("Error waiting for the processing cycle to complete" + e.getMessage());
|
||||||
|
// }
|
||||||
|
// isProcessingMails = priorityQueue.peek() != null;
|
||||||
|
// }
|
||||||
|
// processingThread.interrupt();
|
||||||
|
// }
|
||||||
|
// logger.info("Server shutdown");
|
||||||
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,31 @@
|
|||||||
|
package entgra.mailsender.util;
|
||||||
|
|
||||||
|
import entgra.mailsender.DTO.MailModel;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
public class PriorityQueueHolder {
|
||||||
|
|
||||||
|
// Singleton instance
|
||||||
|
@Getter
|
||||||
|
private static final PriorityQueueHolder instance = new PriorityQueueHolder();
|
||||||
|
@Getter
|
||||||
|
private final AtomicLong insertionOrderCounter = new AtomicLong(1);
|
||||||
|
|
||||||
|
// PriorityBlockingQueue is thread-safe and supports ordering elements based on their natural order or a custom comparator.
|
||||||
|
@Getter
|
||||||
|
private final BlockingQueue<MailModel> priorityQueue = new PriorityBlockingQueue<>(100, Comparator
|
||||||
|
.comparingLong(MailModel::getPriority)
|
||||||
|
.thenComparing(MailModel::getExpiry_at)
|
||||||
|
.thenComparing(MailModel::getInsertionOrder));
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private final Set<MailModel> uniqueMails = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in new issue