Make the executor Callable and retrieve the response

remotes/1712947555871020102/tmp_refs/heads/master
Pahansith Gunathilake 7 months ago
parent 6c1286ebad
commit b74dc5796b

@ -290,8 +290,6 @@ public class ApplicationManagerProviderServiceImpl implements ApplicationManagem
ReportingPublisherManager reportingManager = new ReportingPublisherManager(); ReportingPublisherManager reportingManager = new ReportingPublisherManager();
reportingManager.publishData(deviceDetailsWrapper, DeviceManagementConstants reportingManager.publishData(deviceDetailsWrapper, DeviceManagementConstants
.Report.APP_USAGE_ENDPOINT); .Report.APP_USAGE_ENDPOINT);
/*HttpReportingUtil.invokeApi(deviceDetailsWrapper.getJSONString(),
reportingHost + DeviceManagementConstants.Report.APP_USAGE_ENDPOINT);*/
} }
} catch (DeviceManagementDAOException e) { } catch (DeviceManagementDAOException e) {

@ -107,7 +107,6 @@ public interface DeviceInformationManager {
* @param deviceType device type of an device * @param deviceType device type of an device
* @param payload payload of the event * @param payload payload of the event
* @param eventType Event type being sent * @param eventType Event type being sent
* @return Http status code if a call is made and if failed to make a call 0
* @throws DeviceDetailsMgtException * @throws DeviceDetailsMgtException
*/ */
int publishEvents(String deviceId, String deviceType, String payload, String eventType) int publishEvents(String deviceId, String deviceType, String payload, String eventType)

@ -54,6 +54,8 @@ import java.util.Calendar;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class DeviceInformationManagerImpl implements DeviceInformationManager { public class DeviceInformationManagerImpl implements DeviceInformationManager {
@ -86,6 +88,7 @@ public class DeviceInformationManagerImpl implements DeviceInformationManager {
DeviceDetailsWrapper deviceDetailsWrapper = new DeviceDetailsWrapper(); DeviceDetailsWrapper deviceDetailsWrapper = new DeviceDetailsWrapper();
deviceDetailsWrapper.setDeviceInfo(deviceInfo); deviceDetailsWrapper.setDeviceInfo(deviceInfo);
//Asynchronous call to publish the device information to the reporting service. Hence, response is ignored.
publishEvents(device, deviceDetailsWrapper, DeviceManagementConstants.Report.DEVICE_INFO_PARAM); publishEvents(device, deviceDetailsWrapper, DeviceManagementConstants.Report.DEVICE_INFO_PARAM);
DeviceManagementDAOFactory.beginTransaction(); DeviceManagementDAOFactory.beginTransaction();
@ -203,13 +206,32 @@ public class DeviceInformationManagerImpl implements DeviceInformationManager {
getDeviceManagementProvider().getDevice(deviceIdentifier, false); getDeviceManagementProvider().getDevice(deviceIdentifier, false);
DeviceDetailsWrapper deviceDetailsWrapper = new DeviceDetailsWrapper(); DeviceDetailsWrapper deviceDetailsWrapper = new DeviceDetailsWrapper();
deviceDetailsWrapper.setEvents(payload); deviceDetailsWrapper.setEvents(payload);
publishEvents(device, deviceDetailsWrapper, eventType); Future<Integer> apiCallback = publishEvents(device, deviceDetailsWrapper, eventType);
return 201; if (null != apiCallback) {
while(!apiCallback.isDone()) {
if (log.isDebugEnabled()) {
log.debug("Waiting for the response from the API for the reporting data " +
"publishing for the device " + deviceId + ". Event payload: " + payload);
}
}
return apiCallback.get();
}
return 0; // If the event publishing is disabled.
} catch (DeviceManagementException e) { } catch (DeviceManagementException e) {
DeviceManagementDAOFactory.rollbackTransaction(); DeviceManagementDAOFactory.rollbackTransaction();
String msg = "Event publishing error. Could not get device " + deviceId; String msg = "Event publishing error. Could not get device " + deviceId;
log.error(msg, e); log.error(msg, e);
throw new DeviceDetailsMgtException(msg, e); throw new DeviceDetailsMgtException(msg, e);
} catch (ExecutionException e) {
String message = "Failed while publishing device information data to the reporting service for the device "
+ deviceId;
log.error(message, e);
throw new DeviceDetailsMgtException(message, e);
} catch (InterruptedException e) {
String message = "Failed while publishing device information data to the reporting service. Thread " +
"interrupted while waiting for the response from the API for the Device " + deviceId;
log.error(message, e);
throw new DeviceDetailsMgtException(message, e);
} }
} }
@ -218,7 +240,7 @@ public class DeviceInformationManagerImpl implements DeviceInformationManager {
* @param device Device that is sending event * @param device Device that is sending event
* @param deviceDetailsWrapper Payload to send(example, deviceinfo, applist, raw events) * @param deviceDetailsWrapper Payload to send(example, deviceinfo, applist, raw events)
*/ */
private void publishEvents(Device device, DeviceDetailsWrapper deviceDetailsWrapper, String private Future<Integer> publishEvents(Device device, DeviceDetailsWrapper deviceDetailsWrapper, String
eventType) { eventType) {
String reportingHost = HttpReportingUtil.getReportingHost(); String reportingHost = HttpReportingUtil.getReportingHost();
if (!StringUtils.isBlank(reportingHost) if (!StringUtils.isBlank(reportingHost)
@ -254,8 +276,7 @@ public class DeviceInformationManagerImpl implements DeviceInformationManager {
String eventUrl = reportingHost + DeviceManagementConstants.Report String eventUrl = reportingHost + DeviceManagementConstants.Report
.REPORTING_CONTEXT + DeviceManagementConstants.URL_SEPERATOR + eventType; .REPORTING_CONTEXT + DeviceManagementConstants.URL_SEPERATOR + eventType;
ReportingPublisherManager reportingManager = new ReportingPublisherManager(); ReportingPublisherManager reportingManager = new ReportingPublisherManager();
reportingManager.publishData(deviceDetailsWrapper, eventUrl); return reportingManager.publishData(deviceDetailsWrapper, eventUrl);
//return HttpReportingUtil.invokeApi(deviceDetailsWrapper.getJSONString(), eventUrl);
} catch (GroupManagementException e) { } catch (GroupManagementException e) {
log.error("Error occurred while getting group list", e); log.error("Error occurred while getting group list", e);
} catch (UserStoreException e) { } catch (UserStoreException e) {
@ -271,6 +292,7 @@ public class DeviceInformationManagerImpl implements DeviceInformationManager {
+ DeviceManagerUtil.getTenantId()); + DeviceManagerUtil.getTenantId());
} }
} }
return null;
} }
@Override @Override

@ -33,8 +33,10 @@ import org.apache.http.protocol.HTTP;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ReportingPublisherManager { public class ReportingPublisherManager {
@ -51,15 +53,15 @@ public class ReportingPublisherManager {
poolingManager.setDefaultMaxPerRoute(10); poolingManager.setDefaultMaxPerRoute(10);
} }
public void publishData(DeviceDetailsWrapper deviceDetailsWrapper, String eventUrl) { public Future<Integer> publishData(DeviceDetailsWrapper deviceDetailsWrapper, String eventUrl) {
this.payload = deviceDetailsWrapper; this.payload = deviceDetailsWrapper;
this.endpoint = eventUrl; this.endpoint = eventUrl;
executorService.submit(new ReportingPublisher()); return executorService.submit(new ReportingPublisher());
} }
private class ReportingPublisher implements Runnable { private class ReportingPublisher implements Callable<Integer> {
@Override @Override
public void run() { public Integer call() throws EventPublishingException {
try (CloseableHttpClient client = HttpClients.custom().setConnectionManager(poolingManager).build()) { try (CloseableHttpClient client = HttpClients.custom().setConnectionManager(poolingManager).build()) {
HttpPost apiEndpoint = new HttpPost(endpoint); HttpPost apiEndpoint = new HttpPost(endpoint);
apiEndpoint.setHeader(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()); apiEndpoint.setHeader(HTTP.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
@ -70,12 +72,15 @@ public class ReportingPublisherManager {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Published data to the reporting backend: " + endpoint + ", Response code: " + statusCode); log.debug("Published data to the reporting backend: " + endpoint + ", Response code: " + statusCode);
} }
return statusCode;
} catch (ConnectException e) { } catch (ConnectException e) {
String message = "Connection refused while publishing reporting data to the API: " + endpoint; String message = "Connection refused while publishing reporting data to the API: " + endpoint;
log.error(message, e); log.error(message, e);
throw new EventPublishingException(message, e);
} catch (IOException e) { } catch (IOException e) {
String message = "Error occurred when publishing reporting data to the API: " + endpoint; String message = "Error occurred when publishing reporting data to the API: " + endpoint;
log.error(message, e); log.error(message, e);
throw new EventPublishingException(message, e);
} }
} }
} }

Loading…
Cancel
Save