diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.analytics.wsproxy/src/main/java/org/wso2/carbon/device/mgt/analytics/wsproxy/inbound/SubscriptionEndpoint.java b/components/device-mgt/org.wso2.carbon.device.mgt.analytics.wsproxy/src/main/java/org/wso2/carbon/device/mgt/analytics/wsproxy/inbound/SubscriptionEndpoint.java index 14db789fa9d..b556949ae13 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.analytics.wsproxy/src/main/java/org/wso2/carbon/device/mgt/analytics/wsproxy/inbound/SubscriptionEndpoint.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.analytics.wsproxy/src/main/java/org/wso2/carbon/device/mgt/analytics/wsproxy/inbound/SubscriptionEndpoint.java @@ -49,7 +49,7 @@ public class SubscriptionEndpoint { * * @param session - Users registered session. */ - public void onOpen(Session session) { + void onOpen(Session session) { if (log.isDebugEnabled()) { log.debug("WebSocket opened, for Session id: " + session.getId()); } @@ -68,8 +68,7 @@ public class SubscriptionEndpoint { endpoint += "/"; } endpoint += session.getRequestURI().getSchemeSpecificPart().replace("secured-websocket-proxy",""); - AnalyticsClient analyticsClient = new AnalyticsClient(session); - analyticsClient.connectClient(new URI(endpoint)); + AnalyticsClient analyticsClient = new AnalyticsClient(session, new URI(endpoint)); analyticsClients.add(analyticsClient); } catch (URISyntaxException e) { log.error("Unable to create URL from: " + endpoint, e); @@ -121,7 +120,7 @@ public class SubscriptionEndpoint { * @param session - Users registered session. * @param message - Status code for web-socket close. */ - public void onMessage(Session session, String message) { + void onMessage(Session session, String message) { for (AnalyticsClient analyticsClient : analyticsClientsMap.get(session.getId())) { if (analyticsClient != null) { analyticsClient.sendMessage(message); diff --git a/components/device-mgt/org.wso2.carbon.device.mgt.analytics.wsproxy/src/main/java/org/wso2/carbon/device/mgt/analytics/wsproxy/outbound/AnalyticsClient.java b/components/device-mgt/org.wso2.carbon.device.mgt.analytics.wsproxy/src/main/java/org/wso2/carbon/device/mgt/analytics/wsproxy/outbound/AnalyticsClient.java index 7bfd480288e..96e6d6974a9 100644 --- a/components/device-mgt/org.wso2.carbon.device.mgt.analytics.wsproxy/src/main/java/org/wso2/carbon/device/mgt/analytics/wsproxy/outbound/AnalyticsClient.java +++ b/components/device-mgt/org.wso2.carbon.device.mgt.analytics.wsproxy/src/main/java/org/wso2/carbon/device/mgt/analytics/wsproxy/outbound/AnalyticsClient.java @@ -42,24 +42,18 @@ public class AnalyticsClient { private static final Log log = LogFactory.getLog(AnalyticsClient.class); - private WebSocketContainer container; - private Session analyticsSession = null; - private Session clientSession; + private final Session analyticsSession; + private final Session clientSession; /** * Create {@link AnalyticsClient} instance. */ - public AnalyticsClient(Session clientSession) { - container = ContainerProvider.getWebSocketContainer(); + public AnalyticsClient(Session clientSession, URI endpointURI) throws WSProxyException { + WebSocketContainer container = ContainerProvider.getWebSocketContainer(); this.clientSession = clientSession; - } - /** - * Create web socket client connection using {@link WebSocketContainer}. - */ - public void connectClient(URI endpointURI) throws WSProxyException { try { - analyticsSession = container.connectToServer(this, endpointURI); + this.analyticsSession = container.connectToServer(this, endpointURI); } catch (DeploymentException | IOException e) { String msg = "Error occurred while connecting to remote endpoint " + endpointURI.toString(); log.error(msg, e); @@ -79,7 +73,6 @@ public class AnalyticsClient { log.debug("Closing web socket session: '" + userSession.getId() + "'. Code: " + reason.getCloseCode().toString() + " Reason: " + reason.getReasonPhrase()); } - this.analyticsSession = null; } /** @@ -91,7 +84,16 @@ public class AnalyticsClient { */ @OnMessage public void onMessage(String message) { - this.clientSession.getAsyncRemote().sendText(message); + synchronized (this.clientSession) { + try { + this.clientSession.getBasicRemote().sendText(message); + } catch (IOException e) { + log.warn("Sending message to client failed due to " + e.getMessage()); + if (log.isDebugEnabled()) { + log.debug("Full stack trace:", e); + } + } + } } /** @@ -100,14 +102,23 @@ public class AnalyticsClient { * @param message the message which is going to send. */ public void sendMessage(String message) { - this.analyticsSession.getAsyncRemote().sendText(message); + synchronized (this.analyticsSession) { + try { + this.analyticsSession.getBasicRemote().sendText(message); + } catch (IOException e) { + log.warn("Sending message to analytics failed due to " + e.getMessage()); + if (log.isDebugEnabled()) { + log.debug("Full stack trace:", e); + } + } + } } /** * Close current connection. */ public void closeConnection(CloseReason closeReason) throws WSProxyException { - if (this.analyticsSession != null) { + if (this.analyticsSession.isOpen()) { try { this.analyticsSession.close(closeReason); } catch (IOException e) { @@ -115,6 +126,8 @@ public class AnalyticsClient { log.error(msg, e); throw new WSProxyException(msg, e); } + } else { + log.warn("Analytics session '" + this.analyticsSession.getId() + "' is already closed"); } } }