Skip to content

Commit 819497a

Browse files
committed
IGNITE-27627 Remove marshalling from TcpDiscoveryCustomEventMessage and SecurityAwareCustomMessageWrapper
1 parent c1ec826 commit 819497a

File tree

5 files changed

+52
-207
lines changed

5 files changed

+52
-207
lines changed

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,20 @@
1818
package org.apache.ignite.internal.managers.discovery;
1919

2020
import java.util.UUID;
21-
import org.apache.ignite.IgniteCheckedException;
22-
import org.apache.ignite.internal.MarshallableMessage;
2321
import org.apache.ignite.internal.Order;
24-
import org.apache.ignite.internal.util.typedef.internal.U;
25-
import org.apache.ignite.marshaller.Marshaller;
26-
import org.apache.ignite.plugin.extensions.communication.Message;
2722
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
2823
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2924
import org.jetbrains.annotations.Nullable;
3025

3126
/** Custom message wrapper with ID of security subject that initiated the current message. */
32-
public class SecurityAwareCustomMessageWrapper implements DiscoverySpiCustomMessage, MarshallableMessage {
27+
public class SecurityAwareCustomMessageWrapper implements DiscoverySpiCustomMessage {
3328
/** Security subject ID. */
3429
@Order(0)
3530
UUID secSubjId;
3631

3732
/** Original message. */
38-
private DiscoveryCustomMessage delegate;
39-
40-
/** */
41-
// TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627
4233
@Order(1)
43-
Message msg;
44-
45-
/** Serialized message bytes. */
46-
// TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627
47-
@Order(2)
48-
byte[] msgBytes;
34+
DiscoveryCustomMessage delegate;
4935

5036
/** Default constructor for {@link MessageFactory}. */
5137
public SecurityAwareCustomMessageWrapper() {
@@ -56,9 +42,6 @@ public SecurityAwareCustomMessageWrapper() {
5642
public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate, UUID secSubjId) {
5743
this.delegate = delegate;
5844
this.secSubjId = secSubjId;
59-
60-
if (delegate instanceof Message)
61-
msg = (Message)delegate;
6245
}
6346

6447
/** Gets security Subject ID. */
@@ -80,7 +63,7 @@ public UUID securitySubjectId() {
8063
* @return Delegate.
8164
*/
8265
public DiscoveryCustomMessage delegate() {
83-
return msg != null ? (DiscoveryCustomMessage)msg : delegate;
66+
return delegate;
8467
}
8568

8669
/** {@inheritDoc} */
@@ -89,16 +72,4 @@ public DiscoveryCustomMessage delegate() {
8972

9073
return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack, secSubjId);
9174
}
92-
93-
/** {@inheritDoc} */
94-
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
95-
if (!(delegate instanceof Message))
96-
msgBytes = U.marshal(marsh, delegate);
97-
}
98-
99-
/** {@inheritDoc} */
100-
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
101-
if (msgBytes != null)
102-
delegate = U.unmarshal(marsh, msgBytes, clsLdr);
103-
}
10475
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -500,35 +500,28 @@ else if (state == DISCONNECTED) {
500500
if (state == STOPPED || state == SEGMENTED || state == STARTING)
501501
throw new IgniteException("Failed to send custom message: client is " + state.name().toLowerCase() + ".");
502502

503-
try {
504-
TcpDiscoveryCustomEventMessage msg;
505-
506-
DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
503+
TcpDiscoveryCustomEventMessage msg;
507504

508-
if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
509-
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
510-
else
511-
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
505+
DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
512506

513-
Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
514-
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
515-
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
516-
() -> locNode.consistentId().toString())
517-
.addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName())
518-
.addLog(() -> "Created");
507+
if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
508+
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
509+
else
510+
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
519511

520-
// This root span will be parent both from local and remote nodes.
521-
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
512+
Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
513+
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
514+
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
515+
() -> locNode.consistentId().toString())
516+
.addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName())
517+
.addLog(() -> "Created");
522518

523-
msg.prepareMarshal(spi.marshaller());
519+
// This root span will be parent both from local and remote nodes.
520+
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
524521

525-
sockWriter.sendMessage(msg);
522+
sockWriter.sendMessage(msg);
526523

527-
rootSpan.addLog(() -> "Sent").end();
528-
}
529-
catch (IgniteCheckedException e) {
530-
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
531-
}
524+
rootSpan.addLog(() -> "Sent").end();
532525
}
533526

534527
/** {@inheritDoc} */
@@ -2595,17 +2588,8 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
25952588
TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);
25962589

25972590
if (node != null && node.visible()) {
2598-
try {
2599-
msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
2600-
2601-
DiscoverySpiCustomMessage msgObj = msg.message();
2602-
2603-
notifyDiscovery(
2604-
EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj, msg.spanContainer());
2605-
}
2606-
catch (Throwable e) {
2607-
U.error(log, "Failed to unmarshal discovery custom message.", e);
2608-
}
2591+
notifyDiscovery(
2592+
EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msg.message(), msg.spanContainer());
26092593
}
26102594
else if (log.isDebugEnabled())
26112595
log.debug("Received metrics from unknown node: " + nodeId);

modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java

Lines changed: 26 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,35 +1017,28 @@ private void interruptPing(TcpDiscoveryNode node) {
10171017

10181018
/** {@inheritDoc} */
10191019
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
1020-
try {
1021-
TcpDiscoveryCustomEventMessage msg;
1020+
TcpDiscoveryCustomEventMessage msg;
10221021

1023-
DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
1022+
DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
10241023

1025-
if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
1026-
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
1027-
else
1028-
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
1024+
if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
1025+
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
1026+
else
1027+
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
10291028

1030-
Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
1031-
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
1032-
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
1033-
() -> locNode.consistentId().toString())
1034-
.addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName())
1035-
.addLog(() -> "Created");
1029+
Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
1030+
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
1031+
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
1032+
() -> locNode.consistentId().toString())
1033+
.addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName())
1034+
.addLog(() -> "Created");
10361035

1037-
// This root span will be parent both from local and remote nodes.
1038-
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
1036+
// This root span will be parent both from local and remote nodes.
1037+
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
10391038

1040-
msg.prepareMarshal(spi.marshaller());
1041-
1042-
msgWorker.addMessage(msg);
1039+
msgWorker.addMessage(msg);
10431040

1044-
rootSpan.addLog(() -> "Sent").end();
1045-
}
1046-
catch (IgniteCheckedException e) {
1047-
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
1048-
}
1041+
rootSpan.addLog(() -> "Sent").end();
10491042
}
10501043

10511044
/** {@inheritDoc} */
@@ -2556,9 +2549,6 @@ else if (msg instanceof TcpDiscoveryCustomEventMessage) {
25562549
TcpDiscoveryCustomEventMessage msg0 = (TcpDiscoveryCustomEventMessage)msg;
25572550

25582551
msg = new TcpDiscoveryCustomEventMessage(msg0);
2559-
2560-
// We shoulgn't store deserialized message in the queue because of msg is transient.
2561-
((TcpDiscoveryCustomEventMessage)msg).clearMessage();
25622552
}
25632553

25642554
synchronized (msgs) {
@@ -6106,40 +6096,22 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
61066096
processCustomMessage(msg, waitForNotification);
61076097
}
61086098
}
6109-
6110-
msg.clearMessage();
61116099
}
61126100
else {
61136101
addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));
61146102

6115-
DiscoverySpiCustomMessage msgObj = null;
6103+
DiscoverySpiCustomMessage customMsg = msg.message();
61166104

6117-
try {
6118-
msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
6119-
6120-
msgObj = msg.message();
6121-
}
6122-
catch (Throwable e) {
6123-
U.error(log, "Failed to unmarshal discovery custom message.", e);
6124-
}
6125-
6126-
if (msgObj != null) {
6127-
DiscoverySpiCustomMessage nextMsg = msgObj.ackMessage();
6105+
if (customMsg != null) {
6106+
DiscoverySpiCustomMessage nextMsg = customMsg.ackMessage();
61286107

61296108
if (nextMsg != null) {
6130-
try {
6131-
TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
6132-
getLocalNodeId(), nextMsg);
6133-
6134-
ackMsg.topologyVersion(msg.topologyVersion());
6109+
TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
6110+
getLocalNodeId(), nextMsg);
61356111

6136-
ackMsg.prepareMarshal(spi.marshaller());
6112+
ackMsg.topologyVersion(msg.topologyVersion());
61376113

6138-
processCustomMessage(ackMsg, waitForNotification);
6139-
}
6140-
catch (IgniteCheckedException e) {
6141-
U.error(log, "Failed to marshal discovery custom message.", e);
6142-
}
6114+
processCustomMessage(ackMsg, waitForNotification);
61436115
}
61446116
}
61456117
}
@@ -6165,8 +6137,6 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
61656137
notifyDiscoveryListener(msg, waitForNotification);
61666138
}
61676139

6168-
msg.clearMessage();
6169-
61706140
if (sendMessageToRemotes(msg))
61716141
sendMessageAcrossRing(msg);
61726142
}
@@ -6303,16 +6273,7 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
63036273
if (node == null)
63046274
return;
63056275

6306-
DiscoverySpiCustomMessage msgObj;
6307-
6308-
try {
6309-
msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));
6310-
6311-
msgObj = msg.message();
6312-
}
6313-
catch (Throwable t) {
6314-
throw new IgniteException("Failed to unmarshal discovery custom message: " + msg, t);
6315-
}
6276+
DiscoverySpiCustomMessage customMsg = msg.message();
63166277

63176278
IgniteFuture<?> fut = lsnr.onDiscovery(
63186279
new DiscoveryNotification(
@@ -6321,13 +6282,13 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
63216282
node,
63226283
snapshot,
63236284
hist,
6324-
msgObj,
6285+
customMsg,
63256286
msg.spanContainer())
63266287
);
63276288

63286289
notifiedDiscovery.set(true);
63296290

6330-
if (waitForNotification || msgObj.isMutable()) {
6291+
if (waitForNotification || customMsg.isMutable()) {
63316292
blockingSectionBegin();
63326293

63336294
try {
@@ -6337,15 +6298,6 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
63376298
blockingSectionEnd();
63386299
}
63396300
}
6340-
6341-
if (msgObj.isMutable()) {
6342-
try {
6343-
msg.prepareMarshal(spi.marshaller());
6344-
}
6345-
catch (Throwable t) {
6346-
throw new IgniteException("Failed to marshal mutable discovery message: " + msgObj, t);
6347-
}
6348-
}
63496301
}
63506302
}
63516303

0 commit comments

Comments
 (0)