22
33import java .io .IOException ;
44import java .util .ArrayList ;
5- import java .util .LinkedList ;
65import java .util .List ;
76import java .util .Map ;
87import java .util .UUID ;
98
9+ import javax .annotation .PostConstruct ;
1010import javax .websocket .CloseReason ;
1111import javax .websocket .EndpointConfig ;
1212import javax .websocket .OnClose ;
2121import org .apache .http .message .BasicHeader ;
2222import org .slf4j .Logger ;
2323import org .slf4j .LoggerFactory ;
24+ import org .springframework .beans .BeansException ;
2425import org .springframework .beans .factory .annotation .Autowired ;
2526import org .springframework .beans .factory .annotation .Qualifier ;
2627import org .springframework .context .ApplicationContext ;
28+ import org .springframework .context .ApplicationContextAware ;
2729
2830import com .fasterxml .jackson .core .JsonParseException ;
2931import com .fasterxml .jackson .databind .JsonMappingException ;
4446 * @author predix.adoption@ge.com -
4547 */
4648@ ServerEndpoint (value = "/livestream/{nodeId}" ,configurator =DXWebSocketServerConfig .class )
47- public class DXWebSocketServerEndPoint {
49+ public class DXWebSocketServerEndPoint implements ApplicationContextAware {
4850
4951 private static Logger log = LoggerFactory .getLogger (DXWebSocketServerEndPoint .class );
5052
51- private static final LinkedList <Session > clients = new LinkedList <Session >();
53+ private static final List <Session > clients = new ArrayList <Session >();
5254
5355 @ Autowired
5456 @ Qualifier ("putFieldDataService" )
@@ -60,32 +62,61 @@ public class DXWebSocketServerEndPoint{
6062
6163 private String nodeId ;
6264
65+
6366 private ApplicationContext context ;
6467
68+ /**
69+ * -
70+ */
71+ @ SuppressWarnings ("nls" )
72+ @ PostConstruct
73+ public void init () {
74+ log .debug ("init: context=" + this .context );
75+ log .debug ("init: putFieldDataService=" + this .putFieldDataService );
76+ }
6577
6678 /**
6779 * @param nodeId1 - nodeId for the session
6880 * @param session - session object
6981 * @param ec
7082 * -
7183 */
72- @ OnOpen
84+ @ SuppressWarnings ("nls" )
85+ @ OnOpen
7386 public void onOpen (@ PathParam (value = "nodeId" ) String nodeId1 , final Session session , EndpointConfig ec ) {
7487 //logger.info("headers : "+request.getHeaders()); //$NON-NLS-1$
75- this .nodeId = nodeId1 ;
76- clients .add (session );
77- log .info ("Server: opened... for Node Id : " + nodeId1 + " : " + session .getId ()); //$NON-NLS-1$ //$NON-NLS-2$
78- log .info ("Nunmber of open connections : " + session .getOpenSessions ().size ()); //$NON-NLS-1$
88+ synchronized (clients )
89+ {
90+ this .nodeId = nodeId1 ;
91+ clients .add (session );
92+ log .info ("Server: opened... for Node Id : " + nodeId1 + " : " + session .getId () + " Number of open connections : " + session .getOpenSessions ().size () + " clientsSize : " + clients .size ());
93+ }
7994 }
8095
8196 /**
97+ * @param session
98+ * - session object
99+ * @param closeReason
100+ * - The reason of close of session
101+ */
102+ @ SuppressWarnings ("nls" )
103+ @ OnClose
104+ public void onClose (Session session , CloseReason closeReason ) {
105+ synchronized (clients )
106+ {
107+ clients .remove (session );
108+ log .info ("Server: Session " + session .getId () + " RequestURI : " + session .getRequestURI () + " closed because of " + closeReason .getReasonPhrase () + " Number of open connections : " + session .getOpenSessions ().size () + " clientsSize : " + clients .size ()); //$NON-NLS-1$ //$NON-NLS-2$
109+ }
110+ }
111+
112+ /**
82113 * @param message -
83114 * @param session -
84115 */
85116 @ SuppressWarnings ({ "unchecked" , "nls" })
86117 @ OnMessage
87118 public void onMessage (String message , Session session ) {
88- log .info ("Node Id : " +session .getPathParameters ().get ("nodeId" )+ " Websocket Message : " + message ); //$NON-NLS-1$
119+ log .info ("Node Id: " +session .getPathParameters ().get ("nodeId" ) + " Websocket Message: " + message ); //$NON-NLS-1$
89120 String currentNodeId = session .getPathParameters ().get ("nodeId" );
90121 //log.debug("RequestParameterMap : "+session.getUserProperties()); //$NON-NLS-1$
91122 this .context = (ApplicationContext ) session .getUserProperties ().get ("applicationContext" );
@@ -111,51 +142,70 @@ public void onMessage(String message, Session session) {
111142 }
112143 session .getBasicRemote ().sendText ("SUCCESS" ); //$NON-NLS-1$
113144 }
145+ // send data only to the matching node
146+ //session can be message (ingestion) /compressor2017:<sensor> from UI.
147+ // if session from UI is open send ingestion data to that node as well.
148+ /*for (Session s:clients) {
149+ log.debug("Session is s"+s.getId()+" URL=" +s.getRequestURI().toString());
150+ if (s.isOpen() && !this.nodeId.equals(currentNodeId) && timeSeriesRequest !=null && s.getRequestURI().toString().contains(timeSeriesRequest.getBody().get(0).getName())) {
151+ log.debug("sending data for "+currentNodeId + "message=" +message);
152+ s.getBasicRemote().sendText(message);
153+ }
154+ }*/
114155 if ("messages" .equalsIgnoreCase (currentNodeId )) { //$NON-NLS-1$
115156 if (timeSeriesRequest != null && timeSeriesRequest .getMessageId () != null ) {
116157 putFieldDataRequest = new PutFieldDataRequest ();
117158 putFieldDataRequest .setCorrelationId (UUID .randomUUID ().toString ());
118159 PutFieldDataCriteria criteria = new PutFieldDataCriteria ();
119160 FieldData fieldData = new FieldData ();
120161 String [] fieldSources = this .config .getDefaultFieldSource ().split ("," );
121- for (String source :fieldSources ) {
122- Field field = new Field ();
123- FieldIdentifier fieldIdentifier = new FieldIdentifier ();
124- fieldIdentifier .setSource (source );
125- field .setFieldIdentifier (fieldIdentifier );
126- fieldData .getField ().add (field );
127- }
162+
163+ //send to websocket handler
128164 Field field = new Field ();
129- FieldIdentifier fieldIdentifier = new FieldIdentifier ();
130- fieldIdentifier .setSource ("handler/webSocketHandler" );
131- field .setFieldIdentifier (fieldIdentifier );
132- fieldData .getField ().add (field );
165+ FieldIdentifier fieldIdentifier = new FieldIdentifier ();
166+ fieldIdentifier .setSource ("handler/webSocketHandler" );
167+ field .setFieldIdentifier (fieldIdentifier );
168+ fieldData .getField ().add (field );
169+
170+ //send to other sources as defined in properties
171+ for (String source : fieldSources ) {
172+ Field field2 = new Field ();
173+ FieldIdentifier fieldIdentifier2 = new FieldIdentifier ();
174+ fieldIdentifier2 .setSource (source );
175+ field2 .setFieldIdentifier (fieldIdentifier2 );
176+ fieldData .getField ().add (field2 );
177+ }
133178
134179 fieldData .setData (timeSeriesRequest );
135180 criteria .setFieldData (fieldData );
136181 List <PutFieldDataCriteria > list = new ArrayList <PutFieldDataCriteria >();
137182 list .add (criteria );
138183 putFieldDataRequest .setPutFieldDataCriteria (list );
139184 }
140- log .info ("PutFieldDataRequest " +this .mapper .toJson (putFieldDataRequest ));
185+ log .info ("PutFieldDataRequest= " +this .mapper .toJson (putFieldDataRequest ));
141186 if (putFieldDataRequest != null ) {
142187 //TODO support headers passed in
143188 List <Header > headers = new ArrayList <Header >();
144189 String [] headerNames = {"authorization" ,"predix-zone-id" };
145- //log.debug(session.getUserProperties().toString());
146190 Map <String ,List <String >> headerMap = (Map <String , List <String >>) session .getUserProperties ().get ("headers" );
147191
148192 for (String headerName :headerNames ) {
149- log .debug ("Header Name " + headerName );
193+ log .debug ("Header Name=" + headerName );
150194 List <String > values = headerMap .get (headerName );
151195 if (values != null ) {
152196 headers .add (new BasicHeader (headerName , values .get (0 )));
153197 }
154198 }
199+ List <Session > clientsToSendTo = new ArrayList <Session >();
200+ synchronized (clients ) {
201+ for ( Session s : clients ) {
202+ clientsToSendTo .add (s );
203+ }
204+ }
155205 AttributeMap attributeMap = new AttributeMap ();
156206 Entry entryClients = new Entry ();
157207 entryClients .setKey ("SESSIONS" );
158- entryClients .setValue (clients );
208+ entryClients .setValue (clientsToSendTo );
159209 attributeMap .getEntry ().add (entryClients );
160210
161211 Entry entryCurrentSession = new Entry ();
@@ -167,36 +217,18 @@ public void onMessage(String message, Session session) {
167217 putFieldDataRequest .setExternalAttributeMap (attributeMap );
168218
169219 this .putFieldDataService .putData (putFieldDataRequest , null , headers );
170- log .debug ("No of opensessions : " + clients .size ()); //$NON-NLS-1$
171- for (Session s :clients ) {
172- if (s .isOpen () && !this .nodeId .equals (currentNodeId )) {
173- s .getBasicRemote ().sendText (message );
174- }
175- }
220+ log .debug ("No of opensessions : " + clientsToSendTo .size ()); //$NON-NLS-1$
176221 String response = "{\" messageId\" : " + putFieldDataRequest .getCorrelationId () + ",\" statusCode\" : 202}" ; //$NON-NLS-1$ //$NON-NLS-2$
177222 session .getBasicRemote ().sendText (response );
178223 }
179-
180- }
224+ }
181225 } catch (Throwable e ) {
182226 log .error ("unable to process websocket message" ,e );
183227 throw new RuntimeException ("unable to process websocket message" ,e );
184228 }
185229 }
186230
187231
188- /**
189- * @param session
190- * - session object
191- * @param closeReason
192- * - The reason of close of session
193- */
194- @ OnClose
195- public void onClose (Session session , CloseReason closeReason ) {
196- log .info ("Server: Session " + session .getId () + " closed because of " + closeReason .getReasonPhrase ()); //$NON-NLS-1$ //$NON-NLS-2$
197- clients .remove (session );
198- }
199-
200232 /**
201233 * @param session
202234 * - current session object
@@ -220,4 +252,16 @@ private boolean checkJsonCompatibility(String jsonStr, Class<?> valueType) throw
220252 }
221253
222254 }
255+
256+ /* (non-Javadoc)
257+ * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
258+ */
259+ @ SuppressWarnings ("nls" )
260+ @ Override
261+ public void setApplicationContext (ApplicationContext applicationContext )
262+ throws BeansException
263+ {
264+ this .context = applicationContext ;
265+ log .debug ("setApplicationContext: context=" + this .context );
266+ }
223267}
0 commit comments