|
|
|
@ -76,8 +76,47 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
private static final String MQTT_CONTENT_VALIDATOR = "default";
|
|
|
|
|
private static final String TIMESTAMP_FIELD_NAME = "_timestamp";
|
|
|
|
|
|
|
|
|
|
private static AnalyticsDataAPI getAnalyticsDataAPI() {
|
|
|
|
|
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
|
|
|
|
|
AnalyticsDataAPI analyticsDataAPI =
|
|
|
|
|
(AnalyticsDataAPI) ctx.getOSGiService(AnalyticsDataAPI.class, null);
|
|
|
|
|
if (analyticsDataAPI == null) {
|
|
|
|
|
String msg = "Analytics api service has not initialized.";
|
|
|
|
|
log.error(msg);
|
|
|
|
|
throw new IllegalStateException(msg);
|
|
|
|
|
}
|
|
|
|
|
return analyticsDataAPI;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static EventRecords getAllEventsForDevice(String tableName, String query, List<SortByField> sortByFields
|
|
|
|
|
, int offset, int limit) throws AnalyticsException {
|
|
|
|
|
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
|
|
|
|
|
AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI();
|
|
|
|
|
EventRecords eventRecords = new EventRecords();
|
|
|
|
|
int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query);
|
|
|
|
|
if (eventCount == 0) {
|
|
|
|
|
eventRecords.setCount(0);
|
|
|
|
|
}
|
|
|
|
|
List<SearchResultEntry> resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit,
|
|
|
|
|
sortByFields);
|
|
|
|
|
List<String> recordIds = getRecordIds(resultEntries);
|
|
|
|
|
AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds);
|
|
|
|
|
eventRecords.setCount(eventCount);
|
|
|
|
|
eventRecords.setList(AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response));
|
|
|
|
|
return eventRecords;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static List<String> getRecordIds(List<SearchResultEntry> searchResults) {
|
|
|
|
|
List<String> ids = new ArrayList<>();
|
|
|
|
|
for (SearchResultEntry searchResult : searchResults) {
|
|
|
|
|
ids.add(searchResult.getId());
|
|
|
|
|
}
|
|
|
|
|
return ids;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Retrieves the stream definition from das for the given device type.
|
|
|
|
|
*
|
|
|
|
|
* @return dynamic event attribute list
|
|
|
|
|
*/
|
|
|
|
|
@GET
|
|
|
|
@ -229,9 +268,11 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
eventStreamAdminServiceStub.removeEventStreamDefinition(streamName, Constants.DEFAULT_STREAM_VERSION);
|
|
|
|
|
EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler =
|
|
|
|
|
new EventReceiverAdminServiceCallbackHandler() {};
|
|
|
|
|
new EventReceiverAdminServiceCallbackHandler() {
|
|
|
|
|
};
|
|
|
|
|
EventPublisherAdminServiceCallbackHandler eventPublisherAdminServiceCallbackHandler =
|
|
|
|
|
new EventPublisherAdminServiceCallbackHandler() {};
|
|
|
|
|
new EventPublisherAdminServiceCallbackHandler() {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String eventReceiverName = getReceiverName(deviceType, tenantDomain, TransportType.MQTT);
|
|
|
|
@ -345,7 +386,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
@GET
|
|
|
|
|
@Path("/last-known/{type}/{deviceId}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType,@QueryParam("limit") int limit) {
|
|
|
|
|
public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType, @QueryParam("limit") int limit) {
|
|
|
|
|
String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId;
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String sensorTableName = getTableName(DeviceMgtAPIUtils.getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
@ -363,10 +404,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
SortByField sortByField = new SortByField(TIMESTAMP_FIELD_NAME, SortType.DESC);
|
|
|
|
|
sortByFields.add(sortByField);
|
|
|
|
|
if(limit == 0){
|
|
|
|
|
if (limit == 0) {
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 1);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();}
|
|
|
|
|
else{
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
|
} else {
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, limit);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
|
}
|
|
|
|
@ -384,7 +425,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void publishEventReceivers(String streamNameWithVersion, TransportType transportType
|
|
|
|
|
, String requestedTenantDomain, String deviceType)
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
@ -399,7 +439,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
.getActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved);
|
|
|
|
|
if (eventReceiverConfigurationDto != null) {
|
|
|
|
|
EventReceiverAdminServiceCallbackHandler eventReceiverAdminServiceCallbackHandler =
|
|
|
|
|
new EventReceiverAdminServiceCallbackHandler() {};
|
|
|
|
|
new EventReceiverAdminServiceCallbackHandler() {
|
|
|
|
|
};
|
|
|
|
|
receiverAdminServiceStub.startundeployActiveEventReceiverConfiguration(eventRecieverNameTobeRemoved
|
|
|
|
|
, eventReceiverAdminServiceCallbackHandler);
|
|
|
|
|
}
|
|
|
|
@ -545,44 +586,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-" + transportType.toString() + "-receiver";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static AnalyticsDataAPI getAnalyticsDataAPI() {
|
|
|
|
|
PrivilegedCarbonContext ctx = PrivilegedCarbonContext.getThreadLocalCarbonContext();
|
|
|
|
|
AnalyticsDataAPI analyticsDataAPI =
|
|
|
|
|
(AnalyticsDataAPI) ctx.getOSGiService(AnalyticsDataAPI.class, null);
|
|
|
|
|
if (analyticsDataAPI == null) {
|
|
|
|
|
String msg = "Analytics api service has not initialized.";
|
|
|
|
|
log.error(msg);
|
|
|
|
|
throw new IllegalStateException(msg);
|
|
|
|
|
}
|
|
|
|
|
return analyticsDataAPI;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static EventRecords getAllEventsForDevice(String tableName, String query, List<SortByField> sortByFields
|
|
|
|
|
, int offset, int limit) throws AnalyticsException {
|
|
|
|
|
int tenantId = CarbonContext.getThreadLocalCarbonContext().getTenantId();
|
|
|
|
|
AnalyticsDataAPI analyticsDataAPI = getAnalyticsDataAPI();
|
|
|
|
|
EventRecords eventRecords = new EventRecords();
|
|
|
|
|
int eventCount = analyticsDataAPI.searchCount(tenantId, tableName, query);
|
|
|
|
|
if (eventCount == 0) {
|
|
|
|
|
eventRecords.setCount(0);
|
|
|
|
|
}
|
|
|
|
|
List<SearchResultEntry> resultEntries = analyticsDataAPI.search(tenantId, tableName, query, offset, limit,
|
|
|
|
|
sortByFields);
|
|
|
|
|
List<String> recordIds = getRecordIds(resultEntries);
|
|
|
|
|
AnalyticsDataResponse response = analyticsDataAPI.get(tenantId, tableName, 1, null, recordIds);
|
|
|
|
|
eventRecords.setCount(eventCount);
|
|
|
|
|
eventRecords.setList(AnalyticsDataAPIUtil.listRecords(analyticsDataAPI, response));
|
|
|
|
|
return eventRecords;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static List<String> getRecordIds(List<SearchResultEntry> searchResults) {
|
|
|
|
|
List<String> ids = new ArrayList<>();
|
|
|
|
|
for (SearchResultEntry searchResult : searchResults) {
|
|
|
|
|
ids.add(searchResult.getId());
|
|
|
|
|
}
|
|
|
|
|
return ids;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void cleanup(Stub stub) {
|
|
|
|
|
if (stub != null) {
|
|
|
|
|
try {
|
|
|
|
|