|
|
|
@ -111,6 +111,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
private static final String OAUTH_MQTT_ADAPTER_TYPE = "oauth-mqtt";
|
|
|
|
|
private static final String OAUTH_HTTP_ADAPTER_TYPE = "oauth-http";
|
|
|
|
|
private static final String DEFAULT_DEVICE_ID_ATTRIBUTE = "deviceId";
|
|
|
|
|
private static final String DEFAULT_META_DEVICE_ID_ATTRIBUTE = "meta_deviceId";
|
|
|
|
|
|
|
|
|
|
private static KeyStore keyStore;
|
|
|
|
|
private static KeyStore trustStore;
|
|
|
|
@ -163,9 +164,6 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
EventAttributeList eventAttributeList = new EventAttributeList();
|
|
|
|
|
List<Attribute> attributes = new ArrayList<>();
|
|
|
|
|
for (EventStreamAttributeDto eventStreamAttributeDto : eventStreamAttributeDtos) {
|
|
|
|
|
if (DEFAULT_DEVICE_ID_ATTRIBUTE.equals(eventStreamAttributeDto.getAttributeName())) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
attributes.add(new Attribute(eventStreamAttributeDto.getAttributeName()
|
|
|
|
|
, AttributeType.valueOf(eventStreamAttributeDto.getAttributeType().toUpperCase())));
|
|
|
|
|
}
|
|
|
|
@ -176,11 +174,13 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
eventReceiverAdminServiceStub = getEventReceiverAdminServiceStub();
|
|
|
|
|
EventReceiverConfigurationDto eventReceiverConfigurationDto = eventReceiverAdminServiceStub
|
|
|
|
|
.getActiveEventReceiverConfiguration(getReceiverName(deviceType, tenantDomain));
|
|
|
|
|
if (eventReceiverConfigurationDto != null) {
|
|
|
|
|
String eventAdapterType = eventReceiverConfigurationDto.getFromAdapterConfigurationDto()
|
|
|
|
|
.getEventAdapterType();
|
|
|
|
|
if (OAUTH_MQTT_ADAPTER_TYPE.equals(eventAdapterType)) {
|
|
|
|
|
deviceTypeEvent.setTransportType(TransportType.MQTT);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return Response.ok().entity(deviceTypeEvent).build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("failed to retrieve event definitions for tenantDomain:" + tenantDomain, e);
|
|
|
|
@ -285,7 +285,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(errorMessage).build();
|
|
|
|
|
}
|
|
|
|
|
String eventReceiverName = getReceiverName(deviceType, tenantDomain);
|
|
|
|
|
String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher";
|
|
|
|
|
String eventPublisherName = deviceType.trim().replace(" ", "_") + "_websocket_publisher";
|
|
|
|
|
String streamName = getStreamDefinition(deviceType, tenantDomain);
|
|
|
|
|
eventStreamAdminServiceStub = getEventStreamAdminServiceStub();
|
|
|
|
|
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamName + ":" + DEFAULT_STREAM_VERSION) == null) {
|
|
|
|
@ -360,7 +360,8 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
String fromDate = String.valueOf(from);
|
|
|
|
|
String toDate = String.valueOf(to);
|
|
|
|
|
String query = "deviceId:" + deviceId + " AND _timestamp : [" + fromDate + " TO " + toDate + "]";
|
|
|
|
|
String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId
|
|
|
|
|
+ " AND _timestamp : [" + fromDate + " TO " + toDate + "]";
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String sensorTableName = getTableName(getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
try {
|
|
|
|
@ -397,7 +398,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
@Path("/last-known/{type}/{deviceId}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response getLastKnownData(@PathParam("deviceId") String deviceId, @PathParam("type") String deviceType) {
|
|
|
|
|
String query = "deviceId:" + deviceId;
|
|
|
|
|
String query = DEFAULT_META_DEVICE_ID_ATTRIBUTE + ":" + deviceId;
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
String sensorTableName = getTableName(getStreamDefinition(deviceType, tenantDomain));
|
|
|
|
|
try {
|
|
|
|
@ -436,6 +437,25 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|
EventReceiverAdminServiceStub receiverAdminServiceStub = getEventReceiverAdminServiceStub();
|
|
|
|
|
try {
|
|
|
|
|
EventReceiverConfigurationDto eventReceiverConfigurationDto = receiverAdminServiceStub
|
|
|
|
|
.getActiveEventReceiverConfiguration(getReceiverName(deviceType, requestedTenantDomain));
|
|
|
|
|
if (eventReceiverConfigurationDto != null) {
|
|
|
|
|
String eventAdapterType = eventReceiverConfigurationDto.getFromAdapterConfigurationDto()
|
|
|
|
|
.getEventAdapterType();
|
|
|
|
|
if (OAUTH_MQTT_ADAPTER_TYPE.equals(eventAdapterType)) {
|
|
|
|
|
if (transportType == TransportType.MQTT) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else if (OAUTH_HTTP_ADAPTER_TYPE.equals(eventAdapterType)) {
|
|
|
|
|
if (transportType == TransportType.HTTP) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// remove mqtt event reciever before publishing
|
|
|
|
|
receiverAdminServiceStub.undeployActiveEventReceiverConfiguration(eventRecieverName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String adapterType = OAUTH_MQTT_ADAPTER_TYPE;
|
|
|
|
|
BasicInputAdapterPropertyDto basicInputAdapterPropertyDtos[];
|
|
|
|
|
if (transportType == TransportType.MQTT) {
|
|
|
|
@ -448,7 +468,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
} else {
|
|
|
|
|
adapterType = OAUTH_HTTP_ADAPTER_TYPE;
|
|
|
|
|
basicInputAdapterPropertyDtos = new BasicInputAdapterPropertyDto[1];
|
|
|
|
|
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("contentValidator", "iot-mqtt");
|
|
|
|
|
basicInputAdapterPropertyDtos[0] = getBasicInputAdapterPropertyDto("contentValidator", "iot-http");
|
|
|
|
|
}
|
|
|
|
|
if (receiverAdminServiceStub.getActiveEventReceiverConfiguration(eventRecieverName) == null) {
|
|
|
|
|
receiverAdminServiceStub.deployJsonEventReceiverConfiguration(eventRecieverName, streamNameWithVersion
|
|
|
|
@ -468,7 +488,9 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
eventStreamDefinitionDto.setName(streamName);
|
|
|
|
|
eventStreamDefinitionDto.setVersion(version);
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDtos[] =
|
|
|
|
|
new EventStreamAttributeDto[eventAttributes.getList().size() + 1];
|
|
|
|
|
new EventStreamAttributeDto[eventAttributes.getList().size()];
|
|
|
|
|
EventStreamAttributeDto metaStreamAttributeDtos[] =
|
|
|
|
|
new EventStreamAttributeDto[1];
|
|
|
|
|
int i = 0;
|
|
|
|
|
for (Attribute attribute : eventAttributes.getList()) {
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
@ -481,8 +503,9 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
EventStreamAttributeDto eventStreamAttributeDto = new EventStreamAttributeDto();
|
|
|
|
|
eventStreamAttributeDto.setAttributeName(DEFAULT_DEVICE_ID_ATTRIBUTE);
|
|
|
|
|
eventStreamAttributeDto.setAttributeType(AttributeType.STRING.toString());
|
|
|
|
|
eventStreamAttributeDtos[i] = eventStreamAttributeDto;
|
|
|
|
|
metaStreamAttributeDtos[0] = eventStreamAttributeDto;
|
|
|
|
|
eventStreamDefinitionDto.setPayloadData(eventStreamAttributeDtos);
|
|
|
|
|
eventStreamDefinitionDto.setMetaData(metaStreamAttributeDtos);
|
|
|
|
|
String streamId = streamName + ":" + version;
|
|
|
|
|
if (eventStreamAdminServiceStub.getStreamDefinitionDto(streamId) != null) {
|
|
|
|
|
eventStreamAdminServiceStub.editEventStreamDefinitionAsDto(eventStreamDefinitionDto, streamId);
|
|
|
|
@ -521,7 +544,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
i++;
|
|
|
|
|
}
|
|
|
|
|
AnalyticsTableRecord analyticsTableRecord = new AnalyticsTableRecord();
|
|
|
|
|
analyticsTableRecord.setColumnName(DEFAULT_DEVICE_ID_ATTRIBUTE);
|
|
|
|
|
analyticsTableRecord.setColumnName(DEFAULT_META_DEVICE_ID_ATTRIBUTE);
|
|
|
|
|
analyticsTableRecord.setColumnType(AttributeType.STRING.toString().toUpperCase());
|
|
|
|
|
analyticsTableRecord.setFacet(false);
|
|
|
|
|
analyticsTableRecord.setIndexed(true);
|
|
|
|
@ -540,7 +563,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
throws RemoteException, UserStoreException, JWTClientException {
|
|
|
|
|
EventPublisherAdminServiceStub eventPublisherAdminServiceStub = getEventPublisherAdminServiceStub();
|
|
|
|
|
try {
|
|
|
|
|
String eventPublisherName = deviceType.trim().toLowerCase() + "_websocket_publisher";
|
|
|
|
|
String eventPublisherName = deviceType.trim().replace(" ", "_") + "_websocket_publisher";
|
|
|
|
|
if (eventPublisherAdminServiceStub.getActiveEventPublisherConfiguration(eventPublisherName) == null) {
|
|
|
|
|
eventPublisherAdminServiceStub.deployJsonEventPublisherConfiguration(eventPublisherName
|
|
|
|
|
, streamNameWithVersion, DEFAULT_WEBSOCKET_PUBLISHER_ADAPTER_TYPE, null, null
|
|
|
|
@ -747,7 +770,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getStreamDefinition(String deviceType, String tenantDomain) {
|
|
|
|
|
return tenantDomain.toLowerCase() + "." + deviceType.toLowerCase();
|
|
|
|
|
return "iot.per.device.stream." + tenantDomain + "." + deviceType.replace(" ", ".");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getTableName(String streamName) {
|
|
|
|
@ -755,7 +778,7 @@ public class DeviceEventManagementServiceImpl implements DeviceEventManagementSe
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getReceiverName(String deviceType, String tenantDomain) {
|
|
|
|
|
return deviceType.trim().toLowerCase() + "-" + tenantDomain.toLowerCase() + "-receiver";
|
|
|
|
|
return deviceType.replace(" ", "_").trim() + "-" + tenantDomain + "-receiver";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static AnalyticsDataAPI getAnalyticsDataAPI() {
|
|
|
|
|