Changes to configure insecure endpoint and send notification thorugh the stream

notification-11094
Thasleem Sulaiman 2 months ago
parent b98ae89ebe
commit ef57b97f48

@ -29,6 +29,10 @@ import javax.validation.constraints.Size;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
/**
* Notifications related REST-API.
@ -64,6 +68,13 @@ import javax.ws.rs.core.Response;
key = "dm:notif:mark-checked",
roles = {"Internal/devicemgt-user"},
permissions = {"/device-mgt/notifications/update"}
),
@Scope(
name = "Streaming Device Notifications",
description = "Real-time streaming of device notifications",
key = "dm:notifications:stream",
roles = {"Internal/devicemgt-user"},
permissions = {"/device-mgt/notifications/stream"}
)
}
)
@ -226,4 +237,48 @@ public interface NotificationManagementService {
}
)
Response clearAllNotifications();
/**
* SSE endpoint to send real-time notifications to the client.
* @return StreamingOutput for SSE response.
*/
@GET
@Path("/stream")
@Produces("text/event-stream")
@ApiOperation(
value = "Stream Real-Time Notifications",
notes = "Streams real-time notifications to the client via Server-Sent Events.",
response = StreamingOutput.class,
extensions = {
@Extension(properties = {
@ExtensionProperty(name = "scope", value = "dm:notifications:stream")
})
}
)
default Response streamNotifications() {
StreamingOutput streamingOutput = new StreamingOutput() {
public void write(OutputStream output) throws IOException {
String notification = "data: {\"message\": \"New Notification\"}\n\n";
while (true) {
try {
System.out.println("Sending the notification: " + notification);
output.write(notification.getBytes(StandardCharsets.UTF_8));
output.flush();
Thread.sleep(5000);
} catch (InterruptedException e) {
break;
}
}
}
};
return Response.ok(streamingOutput)
.header("Cache-Control", "no-cache")
.header("Connection", "keep-alive")
.header("Content-Type", "text/event-stream;charset=UTF-8")
.header("Access-Control-Allow-Origin", "*")
.build();
}
}

@ -52,6 +52,7 @@
/api/device-mgt/v1.0/whitelabel/.*/favicon,
/api/device-mgt/v1.0/whitelabel/.*/logo,
/api/device-mgt/v1.0/whitelabel/.*/icon,
/api/device-mgt/v1.0/notifications/stream
</param-value>
</context-param>

@ -129,6 +129,37 @@
<groupId>org.wso2.orbit.javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.24.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.24.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.hk2.external</groupId>
<artifactId>javax.inject</artifactId>
<version>2.2.0-b21</version>
<scope>compile</scope>
</dependency>
</dependencies>

@ -1,49 +0,0 @@
package io.entgra.device.mgt.core.device.mgt.common.notification.mgt;
import java.util.concurrent.*;
public class NotificationWorker {
private final BlockingQueue<Notification> taskQueue;
private final ThreadPoolExecutor executor;
private boolean isInitialized = false;
public NotificationWorker() {
this.taskQueue = new LinkedBlockingQueue<>();
this.executor = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}
private synchronized void startWorker() {
if (!isInitialized) {
isInitialized = true;
System.out.println("Notification Service Worker Thread initialized.");
executor.submit(() -> {
try {
while (true) {
Notification nextTask = taskQueue.take();
System.out.println("New task added; processing in a separate thread.");
executor.submit(() -> processNotification(nextTask));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Notification processing thread was interrupted, terminating.");
}
});
}
}
public synchronized void addNotificationTask(Notification notification) {
taskQueue.offer(notification);
startWorker();
}
private void processNotification(Notification notification) {
try {
System.out.println("Processing task: " + notification);
} catch (Exception e) {
System.err.println("Failed to process notification: " + notification + " due to " + e.getMessage());
}
//The logic should be included in the service layer it will be moved in the relevant milestone --> SSE through notification service
}
}

@ -428,6 +428,7 @@
<Scope>dm:admin:cea:sync</Scope>
<Scope>am:pub:app:upload</Scope>
<Scope>dm:devices:ops:status:update</Scope>
<Scope>dm:notifications:stream</Scope>
</Scopes>
<SSOConfiguration>
<Issuer>device-mgt</Issuer>

Loading…
Cancel
Save