|
|
|
@ -110,6 +110,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
private static final String DEFAULT_WEBSOCKET_PUBLISHER_ADAPTER_TYPE = "secured-websocket";
|
|
|
|
|
private static final String OAUTH_MQTT_ADAPTER_TYPE = "oauth-mqtt";
|
|
|
|
|
private static final String OAUTH_HTTP_ADAPTER_TYPE = "oauth-http";
|
|
|
|
|
private static final String DEFAULT_DEVICE_ID_ATTRIBUTE = "deviceId";
|
|
|
|
|
|
|
|
|
|
private static KeyStore keyStore;
|
|
|
|
|
private static KeyStore trustStore;
|
|
|
|
@ -162,6 +163,9 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
EventAttributeList eventAttributeList = new EventAttributeList();
|
|
|
|
|
List<Attribute> attributes = new ArrayList<>();
|
|
|
|
|
for (EventStreamAttributeDto eventStreamAttributeDto : eventStreamAttributeDtos) {
|
|
|
|
|
if (DEFAULT_DEVICE_ID_ATTRIBUTE.equals(eventStreamAttributeDto.getAttributeName())) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
attributes.add(new Attribute(eventStreamAttributeDto.getAttributeName()
|
|
|
|
|
, AttributeType.valueOf(eventStreamAttributeDto.getAttributeType().toUpperCase())));
|
|
|
|
|
}
|
|
|
|
@ -278,8 +282,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
if (deviceType == null ||
|
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
String errorMessage = "Invalid device type";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
|
|
|
|
|
}
|
|
|
|
|
String eventReceiverName = getReceiverName(deviceType, tenantDomain);
|
|
|
|
|
String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher";
|
|
|
|
@ -310,7 +313,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
tenantBasedEventReceiverAdminServiceStub = getEventReceiverAdminServiceStub();
|
|
|
|
|
tenantBasedEventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
|
tenantBasedEventStreamAdminServiceStub.removeEventStreamDefinition(streamName, DEFAULT_STREAM_VERSION);
|
|
|
|
|
tenantBasedEventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(eventReceiverName
|
|
|
|
|
tenantBasedEventReceiverAdminServiceStub.startundeployInactiveEventReceiverConfiguration(
|
|
|
|
|
eventReceiverName
|
|
|
|
|
, eventReceiverAdminServiceCallbackHandler);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
@ -350,6 +354,10 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
public Response getData(@PathParam("deviceId") String deviceId, @QueryParam("from") long from,
|
|
|
|
|
@QueryParam("to") long to, @PathParam("type") String deviceType, @QueryParam("offset")
|
|
|
|
|
int offset, @QueryParam("limit") int limit) {
|
|
|
|
|
if (from == 0 || to == 0) {
|
|
|
|
|
String errorMessage = "Invalid values for from/to";
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
|
|
|
|
|
}
|
|
|
|
|
String fromDate = String.valueOf(from);
|
|
|
|
|
String toDate = String.valueOf(to);
|
|
|
|
|
String query = "deviceId:" + deviceId + " AND _timestamp : [" + fromDate + " TO " + toDate + "]";
|
|
|
|
@ -367,7 +375,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
}
|
|
|
|
|
List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
SortByField sortByField = new SortByField("_timestamp", SortType.ASC);
|
|
|
|
|
SortByField sortByField = new SortByField("_timestamp", SortType.DESC);
|
|
|
|
|
sortByFields.add(sortByField);
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, offset, limit);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
@ -385,6 +393,43 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@GET
|
|
|
|
|
@Path("/last-known/{type}/{deviceId}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType) {
|
|
|
|
|
String query = "deviceId:" + deviceId;
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String sensorTableName = getTableName(getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
try {
|
|
|
|
|
if (deviceType == null ||
|
|
|
|
|
!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(deviceType)) {
|
|
|
|
|
String errorMessage = "Invalid device type";
|
|
|
|
|
log.error(errorMessage);
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
}
|
|
|
|
|
if (!DeviceMgtAPIUtils.getDeviceAccessAuthorizationService().isUserAuthorized(
|
|
|
|
|
new DeviceIdentifier(deviceId, deviceType))) {
|
|
|
|
|
return Response.status(Response.Status.UNAUTHORIZED.getStatusCode()).build();
|
|
|
|
|
}
|
|
|
|
|
List<SortByField> sortByFields = new ArrayList<>();
|
|
|
|
|
SortByField sortByField = new SortByField("_timestamp", SortType.DESC);
|
|
|
|
|
sortByFields.add(sortByField);
|
|
|
|
|
EventRecords eventRecords = getAllEventsForDevice(sensorTableName, query, sortByFields, 0, 1);
|
|
|
|
|
return Response.status(Response.Status.OK.getStatusCode()).entity(eventRecords).build();
|
|
|
|
|
} catch (AnalyticsException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
} catch (DeviceAccessAuthorizationException e) {
|
|
|
|
|
log.error(e.getErrorMessage(), e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (DeviceManagementException e) {
|
|
|
|
|
String errorMsg = "Error on retrieving stats on table " + sensorTableName + " with query " + query;
|
|
|
|
|
log.error(errorMsg);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()).entity(errorMsg).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void publishEventReceivers(String eventRecieverName, String streamNameWithVersion,
|
|
|
|
|
TransportType transportType
|
|
|
|
|
, String requestedTenantDomain, String deviceType)
|
|
|
|
@ -434,7 +479,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
|
eventStreamAttributeDto.setAttributeName("deviceId");
|
|
|
|
|
eventStreamAttributeDto.setAttributeName(DEFAULT_DEVICE_ID_ATTRIBUTE);
|
|
|
|
|
eventStreamAttributeDto.setAttributeType(AttributeType.STRING.toString());
|
|
|
|
|
eventStreamAttributeDtos[i] = eventStreamAttributeDto;
|
|
|
|
|
eventStreamDefinitionDto.setPayloadData(eventStreamAttributeDtos);
|
|
|
|
@ -476,7 +521,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
i++;
|
|
|
|
|
}
|
|
|
|
|
AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
analyticsTableRecord.setColumnName("deviceId");
|
|
|
|
|
analyticsTableRecord.setColumnName(DEFAULT_DEVICE_ID_ATTRIBUTE);
|
|
|
|
|
analyticsTableRecord.setColumnType(AttributeType.STRING.toString().toUpperCase());
|
|
|
|
|
analyticsTableRecord.setFacet(false);
|
|
|
|
|
analyticsTableRecord.setIndexed(true);
|
|
|
|
|