From b98ae89ebe9f86e18ca3cf3caec29eb59fb655d0 Mon Sep 17 00:00:00 2001 From: thasleem Date: Wed, 6 Nov 2024 16:15:20 +0530 Subject: [PATCH] Implement worker thread for notification service --- .../notification/mgt/NotificationWorker.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 components/device-mgt/io.entgra.device.mgt.core.device.mgt.common/src/main/java/io/entgra/device/mgt/core/device/mgt/common/notification/mgt/NotificationWorker.java diff --git a/components/device-mgt/io.entgra.device.mgt.core.device.mgt.common/src/main/java/io/entgra/device/mgt/core/device/mgt/common/notification/mgt/NotificationWorker.java b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.common/src/main/java/io/entgra/device/mgt/core/device/mgt/common/notification/mgt/NotificationWorker.java new file mode 100644 index 0000000000..4711190df4 --- /dev/null +++ b/components/device-mgt/io.entgra.device.mgt.core.device.mgt.common/src/main/java/io/entgra/device/mgt/core/device/mgt/common/notification/mgt/NotificationWorker.java @@ -0,0 +1,49 @@ +package io.entgra.device.mgt.core.device.mgt.common.notification.mgt; + +import java.util.concurrent.*; + +public class NotificationWorker { + private final BlockingQueue taskQueue; + private final ThreadPoolExecutor executor; + private boolean isInitialized = false; + + public NotificationWorker() { + this.taskQueue = new LinkedBlockingQueue<>(); + this.executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); + } + + private synchronized void startWorker() { + if (!isInitialized) { + isInitialized = true; + System.out.println("Notification Service Worker Thread initialized."); + + executor.submit(() -> { + try { + while (true) { + Notification nextTask = taskQueue.take(); + System.out.println("New task added; processing in a separate thread."); + executor.submit(() -> processNotification(nextTask)); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.out.println("Notification processing thread was interrupted, terminating."); + } + }); + } + } + + public synchronized void addNotificationTask(Notification notification) { + taskQueue.offer(notification); + startWorker(); + } + + private void processNotification(Notification notification) { + try { + System.out.println("Processing task: " + notification); + } catch (Exception e) { + System.err.println("Failed to process notification: " + notification + " due to " + e.getMessage()); + } + //The logic should be included in the service layer it will be moved in the relevant milestone --> SSE through notification service + } + +}