@@ -1017,35 +1017,28 @@ private void interruptPing(TcpDiscoveryNode node) {
10171017
10181018 /** {@inheritDoc} */
10191019 @ Override public void sendCustomEvent (DiscoverySpiCustomMessage evt ) {
1020- try {
1021- TcpDiscoveryCustomEventMessage msg ;
1020+ TcpDiscoveryCustomEventMessage msg ;
10221021
1023- DiscoverySpiCustomMessage customMsg = U .unwrapCustomMessage (evt );
1022+ DiscoverySpiCustomMessage customMsg = U .unwrapCustomMessage (evt );
10241023
1025- if (customMsg instanceof DiscoveryServerOnlyCustomMessage )
1026- msg = new TcpDiscoveryServerOnlyCustomEventMessage (getLocalNodeId (), evt );
1027- else
1028- msg = new TcpDiscoveryCustomEventMessage (getLocalNodeId (), evt );
1024+ if (customMsg instanceof DiscoveryServerOnlyCustomMessage )
1025+ msg = new TcpDiscoveryServerOnlyCustomEventMessage (getLocalNodeId (), evt );
1026+ else
1027+ msg = new TcpDiscoveryCustomEventMessage (getLocalNodeId (), evt );
10291028
1030- Span rootSpan = tracing .create (TraceableMessagesTable .traceName (msg .getClass ()))
1031- .addTag (SpanTags .tag (SpanTags .EVENT_NODE , SpanTags .ID ), () -> getLocalNodeId ().toString ())
1032- .addTag (SpanTags .tag (SpanTags .EVENT_NODE , SpanTags .CONSISTENT_ID ),
1033- () -> locNode .consistentId ().toString ())
1034- .addTag (SpanTags .MESSAGE_CLASS , () -> customMsg .getClass ().getSimpleName ())
1035- .addLog (() -> "Created" );
1029+ Span rootSpan = tracing .create (TraceableMessagesTable .traceName (msg .getClass ()))
1030+ .addTag (SpanTags .tag (SpanTags .EVENT_NODE , SpanTags .ID ), () -> getLocalNodeId ().toString ())
1031+ .addTag (SpanTags .tag (SpanTags .EVENT_NODE , SpanTags .CONSISTENT_ID ),
1032+ () -> locNode .consistentId ().toString ())
1033+ .addTag (SpanTags .MESSAGE_CLASS , () -> customMsg .getClass ().getSimpleName ())
1034+ .addLog (() -> "Created" );
10361035
1037- // This root span will be parent both from local and remote nodes.
1038- msg .spanContainer ().serializedSpanBytes (tracing .serialize (rootSpan ));
1036+ // This root span will be parent both from local and remote nodes.
1037+ msg .spanContainer ().serializedSpanBytes (tracing .serialize (rootSpan ));
10391038
1040- msg .prepareMarshal (spi .marshaller ());
1041-
1042- msgWorker .addMessage (msg );
1039+ msgWorker .addMessage (msg );
10431040
1044- rootSpan .addLog (() -> "Sent" ).end ();
1045- }
1046- catch (IgniteCheckedException e ) {
1047- throw new IgniteSpiException ("Failed to marshal custom event: " + evt , e );
1048- }
1041+ rootSpan .addLog (() -> "Sent" ).end ();
10491042 }
10501043
10511044 /** {@inheritDoc} */
@@ -2556,9 +2549,6 @@ else if (msg instanceof TcpDiscoveryCustomEventMessage) {
25562549 TcpDiscoveryCustomEventMessage msg0 = (TcpDiscoveryCustomEventMessage )msg ;
25572550
25582551 msg = new TcpDiscoveryCustomEventMessage (msg0 );
2559-
2560- // We shoulgn't store deserialized message in the queue because of msg is transient.
2561- ((TcpDiscoveryCustomEventMessage )msg ).clearMessage ();
25622552 }
25632553
25642554 synchronized (msgs ) {
@@ -6106,40 +6096,22 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
61066096 processCustomMessage (msg , waitForNotification );
61076097 }
61086098 }
6109-
6110- msg .clearMessage ();
61116099 }
61126100 else {
61136101 addMessage (new TcpDiscoveryDiscardMessage (getLocalNodeId (), msg .id (), true ));
61146102
6115- DiscoverySpiCustomMessage msgObj = null ;
6103+ DiscoverySpiCustomMessage customMsg = msg . message () ;
61166104
6117- try {
6118- msg .finishUnmarshal (spi .marshaller (), U .resolveClassLoader (spi .ignite ().configuration ()));
6119-
6120- msgObj = msg .message ();
6121- }
6122- catch (Throwable e ) {
6123- U .error (log , "Failed to unmarshal discovery custom message." , e );
6124- }
6125-
6126- if (msgObj != null ) {
6127- DiscoverySpiCustomMessage nextMsg = msgObj .ackMessage ();
6105+ if (customMsg != null ) {
6106+ DiscoverySpiCustomMessage nextMsg = customMsg .ackMessage ();
61286107
61296108 if (nextMsg != null ) {
6130- try {
6131- TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage (
6132- getLocalNodeId (), nextMsg );
6133-
6134- ackMsg .topologyVersion (msg .topologyVersion ());
6109+ TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage (
6110+ getLocalNodeId (), nextMsg );
61356111
6136- ackMsg .prepareMarshal ( spi . marshaller ());
6112+ ackMsg .topologyVersion ( msg . topologyVersion ());
61376113
6138- processCustomMessage (ackMsg , waitForNotification );
6139- }
6140- catch (IgniteCheckedException e ) {
6141- U .error (log , "Failed to marshal discovery custom message." , e );
6142- }
6114+ processCustomMessage (ackMsg , waitForNotification );
61436115 }
61446116 }
61456117 }
@@ -6165,8 +6137,6 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
61656137 notifyDiscoveryListener (msg , waitForNotification );
61666138 }
61676139
6168- msg .clearMessage ();
6169-
61706140 if (sendMessageToRemotes (msg ))
61716141 sendMessageAcrossRing (msg );
61726142 }
@@ -6303,16 +6273,7 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
63036273 if (node == null )
63046274 return ;
63056275
6306- DiscoverySpiCustomMessage msgObj ;
6307-
6308- try {
6309- msg .finishUnmarshal (spi .marshaller (), U .resolveClassLoader (spi .ignite ().configuration ()));
6310-
6311- msgObj = msg .message ();
6312- }
6313- catch (Throwable t ) {
6314- throw new IgniteException ("Failed to unmarshal discovery custom message: " + msg , t );
6315- }
6276+ DiscoverySpiCustomMessage customMsg = msg .message ();
63166277
63176278 IgniteFuture <?> fut = lsnr .onDiscovery (
63186279 new DiscoveryNotification (
@@ -6321,13 +6282,13 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
63216282 node ,
63226283 snapshot ,
63236284 hist ,
6324- msgObj ,
6285+ customMsg ,
63256286 msg .spanContainer ())
63266287 );
63276288
63286289 notifiedDiscovery .set (true );
63296290
6330- if (waitForNotification || msgObj .isMutable ()) {
6291+ if (waitForNotification || customMsg .isMutable ()) {
63316292 blockingSectionBegin ();
63326293
63336294 try {
@@ -6337,15 +6298,6 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
63376298 blockingSectionEnd ();
63386299 }
63396300 }
6340-
6341- if (msgObj .isMutable ()) {
6342- try {
6343- msg .prepareMarshal (spi .marshaller ());
6344- }
6345- catch (Throwable t ) {
6346- throw new IgniteException ("Failed to marshal mutable discovery message: " + msgObj , t );
6347- }
6348- }
63496301 }
63506302 }
63516303
0 commit comments