|
|
|
@ -306,6 +306,106 @@ public class DeviceAgentServiceImpl implements DeviceAgentService {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@POST
|
|
|
|
|
@Path("/events/publish/data/{type}/{deviceId}")
|
|
|
|
|
@Override
|
|
|
|
|
public Response publishEvents(@Valid List<Object> payload, @PathParam("type") String type
|
|
|
|
|
, @PathParam("deviceId") String deviceId) {
|
|
|
|
|
|
|
|
|
|
String tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
|
|
|
|
EventStreamAdminServiceStub eventStreamAdminServiceStub = null;
|
|
|
|
|
try {
|
|
|
|
|
if (payload == null) {
|
|
|
|
|
String msg = "invalid payload structure";
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(msg).build();
|
|
|
|
|
} else {
|
|
|
|
|
boolean authorized = DeviceMgtAPIUtils.getDeviceAccessAuthorizationService().isUserAuthorized
|
|
|
|
|
(new DeviceIdentifier(type, deviceId));
|
|
|
|
|
if (!authorized) {
|
|
|
|
|
String msg = "does not have permission to access the device.";
|
|
|
|
|
return Response.status(Response.Status.UNAUTHORIZED).entity(msg).build();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Object metaData[] = new Object[1];
|
|
|
|
|
metaData[0] = deviceId;
|
|
|
|
|
EventAttributeList eventAttributeList = DeviceMgtAPIUtils.getDynamicEventCache().get(type);
|
|
|
|
|
if (eventAttributeList == null) {
|
|
|
|
|
String streamName = DeviceMgtAPIUtils.getStreamDefinition(type, tenantDomain);
|
|
|
|
|
eventStreamAdminServiceStub = DeviceMgtAPIUtils.getEventStreamAdminServiceStub();
|
|
|
|
|
EventStreamDefinitionDto eventStreamDefinitionDto = eventStreamAdminServiceStub.getStreamDefinitionDto(
|
|
|
|
|
streamName + ":" + Constants.DEFAULT_STREAM_VERSION);
|
|
|
|
|
if (eventStreamDefinitionDto == null) {
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).build();
|
|
|
|
|
} else {
|
|
|
|
|
EventStreamAttributeDto[] eventStreamAttributeDtos = eventStreamDefinitionDto.getPayloadData();
|
|
|
|
|
List<Attribute> attributes = new ArrayList<>();
|
|
|
|
|
for (EventStreamAttributeDto eventStreamAttributeDto : eventStreamAttributeDtos) {
|
|
|
|
|
attributes.add(new Attribute(eventStreamAttributeDto.getAttributeName()
|
|
|
|
|
, AttributeType.valueOf(eventStreamAttributeDto.getAttributeType().toUpperCase())));
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
if (payload.size() != attributes.size()) {
|
|
|
|
|
String msg = "payload does not match with the stream definition";
|
|
|
|
|
return Response.status(Response.Status.BAD_REQUEST).entity(msg).build();
|
|
|
|
|
}
|
|
|
|
|
eventAttributeList = new EventAttributeList();
|
|
|
|
|
eventAttributeList.setList(attributes);
|
|
|
|
|
DeviceMgtAPIUtils.getDynamicEventCache().put(type, eventAttributeList);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
int i = 0;
|
|
|
|
|
Object[] payloadData = new Object[eventAttributeList.getList().size()];
|
|
|
|
|
for (Attribute attribute : eventAttributeList.getList()) {
|
|
|
|
|
if (attribute.getType() == AttributeType.INT) {
|
|
|
|
|
payloadData[i] = ((Double) payload.get(i)).intValue();
|
|
|
|
|
} else if (attribute.getType() == AttributeType.LONG) {
|
|
|
|
|
payloadData[i] = ((Double) payload.get(i)).longValue();
|
|
|
|
|
} else {
|
|
|
|
|
payloadData[i] = payload.get(i);
|
|
|
|
|
}
|
|
|
|
|
i++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (DeviceMgtAPIUtils.getEventPublisherService().publishEvent(DeviceMgtAPIUtils.getStreamDefinition(type
|
|
|
|
|
, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain())
|
|
|
|
|
, Constants.DEFAULT_STREAM_VERSION, metaData
|
|
|
|
|
, null, payloadData)) {
|
|
|
|
|
return Response.status(Response.Status.OK).build();
|
|
|
|
|
} else {
|
|
|
|
|
String msg = "Error occurred while publishing the event.";
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(msg).build();
|
|
|
|
|
}
|
|
|
|
|
} catch (DataPublisherConfigurationException e) {
|
|
|
|
|
String msg = "Error occurred while publishing the event.";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(msg).build();
|
|
|
|
|
} catch (DeviceAccessAuthorizationException e) {
|
|
|
|
|
String msg = "Error occurred when checking for authorization";
|
|
|
|
|
log.error(msg, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(msg).build();
|
|
|
|
|
} catch (AxisFault e) {
|
|
|
|
|
log.error("failed to retrieve event definitions for tenantDomain:" + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (RemoteException e) {
|
|
|
|
|
log.error("Failed to connect with the remote services:" + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (JWTClientException e) {
|
|
|
|
|
log.error("Failed to generate jwt token for tenantDomain:" + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} catch (UserStoreException e) {
|
|
|
|
|
log.error("Failed to connect with the user store, tenantDomain: " + tenantDomain, e);
|
|
|
|
|
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
|
|
|
|
|
} finally {
|
|
|
|
|
if (eventStreamAdminServiceStub != null) {
|
|
|
|
|
try {
|
|
|
|
|
eventStreamAdminServiceStub.cleanup();
|
|
|
|
|
} catch (AxisFault axisFault) {
|
|
|
|
|
log.warn("Failed to clean eventStreamAdminServiceStub");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@GET
|
|
|
|
|
@Path("/pending/operations/{type}/{id}")
|
|
|
|
|
public Response getPendingOperations(@PathParam("type") String type, @PathParam("id") String deviceId) {
|
|
|
|
@ -339,7 +439,7 @@ public class DeviceAgentServiceImpl implements DeviceAgentService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@GET
|
|
|
|
|
@Path("/last-pending/operation/{type}/{id}")
|
|
|
|
|
@Path("/next-pending/operation/{type}/{id}")
|
|
|
|
|
public Response getNextPendingOperation(@PathParam("type") String type, @PathParam("id") String deviceId) {
|
|
|
|
|
try {
|
|
|
|
|
if (!DeviceMgtAPIUtils.getDeviceManagementService().getAvailableDeviceTypes().contains(type)) {
|
|
|
|
|