diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java index 7c8269bd14d8f..e9d33b8433cbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java @@ -18,34 +18,20 @@ package org.apache.ignite.internal.managers.discovery; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.jetbrains.annotations.Nullable; /** Custom message wrapper with ID of security subject that initiated the current message. */ -public class SecurityAwareCustomMessageWrapper implements DiscoverySpiCustomMessage, MarshallableMessage { +public class SecurityAwareCustomMessageWrapper implements DiscoverySpiCustomMessage { /** Security subject ID. */ @Order(0) UUID secSubjId; /** Original message. */ - private DiscoveryCustomMessage delegate; - - /** */ - // TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 @Order(1) - Message msg; - - /** Serialized message bytes. */ - // TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 - @Order(2) - byte[] msgBytes; + DiscoveryCustomMessage delegate; /** Default constructor for {@link MessageFactory}. */ public SecurityAwareCustomMessageWrapper() { @@ -56,9 +42,6 @@ public SecurityAwareCustomMessageWrapper() { public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate, UUID secSubjId) { this.delegate = delegate; this.secSubjId = secSubjId; - - if (delegate instanceof Message) - msg = (Message)delegate; } /** Gets security Subject ID. */ @@ -80,7 +63,7 @@ public UUID securitySubjectId() { * @return Delegate. */ public DiscoveryCustomMessage delegate() { - return msg != null ? (DiscoveryCustomMessage)msg : delegate; + return delegate; } /** {@inheritDoc} */ @@ -89,16 +72,4 @@ public DiscoveryCustomMessage delegate() { return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack, secSubjId); } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - if (!(delegate instanceof Message)) - msgBytes = U.marshal(marsh, delegate); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - if (msgBytes != null) - delegate = U.unmarshal(marsh, msgBytes, clsLdr); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 0fce9fac5bb8d..404b24daa4112 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -500,35 +500,28 @@ else if (state == DISCONNECTED) { if (state == STOPPED || state == SEGMENTED || state == STARTING) throw new IgniteException("Failed to send custom message: client is " + state.name().toLowerCase() + "."); - try { - TcpDiscoveryCustomEventMessage msg; - - DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt); + TcpDiscoveryCustomEventMessage msg; - if (customMsg instanceof DiscoveryServerOnlyCustomMessage) - msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt); - else - msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt); + DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt); - Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass())) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString()) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), - () -> locNode.consistentId().toString()) - .addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName()) - .addLog(() -> "Created"); + if (customMsg instanceof DiscoveryServerOnlyCustomMessage) + msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt); + else + msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt); - // This root span will be parent both from local and remote nodes. - msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); + Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass())) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString()) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), + () -> locNode.consistentId().toString()) + .addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName()) + .addLog(() -> "Created"); - msg.prepareMarshal(spi.marshaller()); + // This root span will be parent both from local and remote nodes. + msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); - sockWriter.sendMessage(msg); + sockWriter.sendMessage(msg); - rootSpan.addLog(() -> "Sent").end(); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); - } + rootSpan.addLog(() -> "Sent").end(); } /** {@inheritDoc} */ @@ -2595,17 +2588,8 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId); if (node != null && node.visible()) { - try { - msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); - - DiscoverySpiCustomMessage msgObj = msg.message(); - - notifyDiscovery( - EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj, msg.spanContainer()); - } - catch (Throwable e) { - U.error(log, "Failed to unmarshal discovery custom message.", e); - } + notifyDiscovery( + EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msg.message(), msg.spanContainer()); } else if (log.isDebugEnabled()) log.debug("Received metrics from unknown node: " + nodeId); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index cc87595684b87..17bda88dcbc0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1017,35 +1017,28 @@ private void interruptPing(TcpDiscoveryNode node) { /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { - try { - TcpDiscoveryCustomEventMessage msg; + TcpDiscoveryCustomEventMessage msg; - DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt); + DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt); - if (customMsg instanceof DiscoveryServerOnlyCustomMessage) - msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt); - else - msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt); + if (customMsg instanceof DiscoveryServerOnlyCustomMessage) + msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt); + else + msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt); - Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass())) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString()) - .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), - () -> locNode.consistentId().toString()) - .addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName()) - .addLog(() -> "Created"); + Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass())) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString()) + .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID), + () -> locNode.consistentId().toString()) + .addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName()) + .addLog(() -> "Created"); - // This root span will be parent both from local and remote nodes. - msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); + // This root span will be parent both from local and remote nodes. + msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan)); - msg.prepareMarshal(spi.marshaller()); - - msgWorker.addMessage(msg); + msgWorker.addMessage(msg); - rootSpan.addLog(() -> "Sent").end(); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to marshal custom event: " + evt, e); - } + rootSpan.addLog(() -> "Sent").end(); } /** {@inheritDoc} */ @@ -2556,9 +2549,6 @@ else if (msg instanceof TcpDiscoveryCustomEventMessage) { TcpDiscoveryCustomEventMessage msg0 = (TcpDiscoveryCustomEventMessage)msg; msg = new TcpDiscoveryCustomEventMessage(msg0); - - // We shoulgn't store deserialized message in the queue because of msg is transient. - ((TcpDiscoveryCustomEventMessage)msg).clearMessage(); } synchronized (msgs) { @@ -6106,40 +6096,22 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa processCustomMessage(msg, waitForNotification); } } - - msg.clearMessage(); } else { addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true)); - DiscoverySpiCustomMessage msgObj = null; + DiscoverySpiCustomMessage customMsg = msg.message(); - try { - msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); - - msgObj = msg.message(); - } - catch (Throwable e) { - U.error(log, "Failed to unmarshal discovery custom message.", e); - } - - if (msgObj != null) { - DiscoverySpiCustomMessage nextMsg = msgObj.ackMessage(); + if (customMsg != null) { + DiscoverySpiCustomMessage nextMsg = customMsg.ackMessage(); if (nextMsg != null) { - try { - TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage( - getLocalNodeId(), nextMsg); - - ackMsg.topologyVersion(msg.topologyVersion()); + TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage( + getLocalNodeId(), nextMsg); - ackMsg.prepareMarshal(spi.marshaller()); + ackMsg.topologyVersion(msg.topologyVersion()); - processCustomMessage(ackMsg, waitForNotification); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to marshal discovery custom message.", e); - } + processCustomMessage(ackMsg, waitForNotification); } } } @@ -6165,8 +6137,6 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa notifyDiscoveryListener(msg, waitForNotification); } - msg.clearMessage(); - if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } @@ -6303,16 +6273,7 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean if (node == null) return; - DiscoverySpiCustomMessage msgObj; - - try { - msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); - - msgObj = msg.message(); - } - catch (Throwable t) { - throw new IgniteException("Failed to unmarshal discovery custom message: " + msg, t); - } + DiscoverySpiCustomMessage customMsg = msg.message(); IgniteFuture fut = lsnr.onDiscovery( new DiscoveryNotification( @@ -6321,13 +6282,13 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean node, snapshot, hist, - msgObj, + customMsg, msg.spanContainer()) ); notifiedDiscovery.set(true); - if (waitForNotification || msgObj.isMutable()) { + if (waitForNotification || customMsg.isMutable()) { blockingSectionBegin(); try { @@ -6337,15 +6298,6 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean blockingSectionEnd(); } } - - if (msgObj.isMutable()) { - try { - msg.prepareMarshal(spi.marshaller()); - } - catch (Throwable t) { - throw new IgniteException("Failed to marshal mutable discovery message: " + msgObj, t); - } - } } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 30e9b1b73f810..6be18c60c408e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -62,19 +62,9 @@ public class TcpDiscoveryIoSession { /** Size for an intermediate buffer for serializing discovery messages. */ private static final int MSG_BUFFER_SIZE = 100; - /** Leading byte for messages use {@link JdkMarshaller} for serialization. */ - // TODO: remove these flags after refactoring all discovery messages. - static final byte JAVA_SERIALIZATION = (byte)1; - - /** Leading byte for messages use {@link MessageSerializer} for serialization. */ - static final byte MESSAGE_SERIALIZATION = (byte)2; - /** */ final TcpDiscoverySpi spi; - /** Loads discovery messages classes during java deserialization. */ - private final ClassLoader clsLdr; - /** */ private final Socket sock; @@ -104,8 +94,6 @@ public class TcpDiscoveryIoSession { this.sock = sock; this.spi = spi; - clsLdr = U.resolveClassLoader(spi.ignite().configuration()); - msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); msgWriter = new DirectMessageWriter(spi.messageFactory()); @@ -130,17 +118,7 @@ public class TcpDiscoveryIoSession { * @throws IgniteCheckedException If serialization fails. */ void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { - if (!(msg instanceof Message)) { - out.write(JAVA_SERIALIZATION); - - U.marshal(spi.marshaller(), msg, out); - - return; - } - try { - out.write(MESSAGE_SERIALIZATION); - serializeMessage((Message)msg, out); out.flush(); @@ -162,21 +140,23 @@ void writeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException * @throws IgniteCheckedException If deserialization fails. */ T readMessage() throws IgniteCheckedException, IOException { - byte serMode = (byte)in.read(); + try { + byte b0 = (byte)in.read(); + byte b1 = (byte)in.read(); - if (JAVA_SERIALIZATION == serMode) - return U.unmarshal(spi.marshaller(), in, clsLdr); + short msgType = makeMessageType(b0, b1); - try { - if (MESSAGE_SERIALIZATION != serMode) { - detectSslAlert(serMode, in); + Message msg; - // IOException type is important for ServerImpl. It may search the cause (X.hasCause). - // The connection error processing behavior depends on it. - throw new IOException("Received unexpected byte while reading discovery message: " + serMode); + try { + msg = spi.messageFactory().create(msgType); } + catch (IgniteException e) { + detectSslAlert(b0, b1, in); - Message msg = spi.messageFactory().create(makeMessageType((byte)in.read(), (byte)in.read())); + // 'Invalid message type' should not be lost. + throw e; + } msgReader.reset(); msgReader.setBuffer(msgBuf); @@ -273,12 +253,13 @@ void serializeMessage(Message m, OutputStream out) throws IOException { * See handling {@code StreamCorruptedException} in {@link #readMessage()}. * Keeps logic similar to {@link java.io.ObjectInputStream#readStreamHeader}. */ - private void detectSslAlert(byte firstByte, InputStream in) throws IOException { + private void detectSslAlert(byte b0, byte b1, InputStream in) throws IOException { byte[] hdr = new byte[4]; - hdr[0] = firstByte; - int read = in.readNBytes(hdr, 1, 3); + hdr[0] = b0; + hdr[1] = b1; + int read = in.readNBytes(hdr, 2, 2); - if (read < 3) + if (read < 2) throw new EOFException(); String hex = String.format("%02x%02x%02x%02x", hdr[0], hdr[1], hdr[2], hdr[3]); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java index 1c9e1bcc69c13..8c871f9e2eb46 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMessageSerializer.java @@ -22,7 +22,6 @@ import java.io.OutputStream; import java.net.Socket; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -59,9 +58,6 @@ public TcpDiscoveryMessageSerializer(TcpDiscoverySpi spi) { * @throws IOException If serialization fails. */ byte[] serializeMessage(TcpDiscoveryAbstractMessage msg) throws IgniteCheckedException, IOException { - if (!(msg instanceof Message)) - return U.marshal(spi.marshaller(), msg); - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { serializeMessage((Message)msg, out); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 2fa700d9c7f2f..903524c8aa96b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -71,7 +71,6 @@ import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; @@ -1705,13 +1704,6 @@ protected void writeToSocket( try (SocketTimeoutObject ignored = startTimer(sock, timeout)) { OutputStream out = sock.getOutputStream(); - // Write Ignite header without leading byte. - if (msg != null) { - byte mode = msg instanceof Message ? TcpDiscoveryIoSession.MESSAGE_SERIALIZATION : TcpDiscoveryIoSession.JAVA_SERIALIZATION; - - out.write(mode); - } - out.write(data); out.flush(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index e4ed7cafb8b50..09784f49661f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -19,34 +19,21 @@ import java.util.Objects; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; -import org.jetbrains.annotations.Nullable; /** - * Wrapper for custom message. + * Wrapper for {@link DiscoveryCustomMessage}. */ @TcpDiscoveryRedirectToClient @TcpDiscoveryEnsureDelivery public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractTraceableMessage { /** */ - private volatile DiscoverySpiCustomMessage msg; - - /** Serialized message bytes. */ - // TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 @Order(0) - volatile @Nullable byte[] msgBytes; - - /** {@link Message} representation of original message. */ - // TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 - @Order(1) - volatile @Nullable Message serMsg; + DiscoverySpiCustomMessage msg; /** * Constructor for {@link MessageFactory}. @@ -72,18 +59,9 @@ public TcpDiscoveryCustomEventMessage(UUID creatorNodeId, DiscoverySpiCustomMess public TcpDiscoveryCustomEventMessage(TcpDiscoveryCustomEventMessage msg) { super(msg); - msgBytes = msg.msgBytes; - serMsg = msg.serMsg; this.msg = msg.msg; } - /** - * Clear deserialized form of wrapped message. - */ - public void clearMessage() { - msg = null; - } - /** * @return Original message. */ @@ -91,49 +69,11 @@ public DiscoverySpiCustomMessage message() { return msg; } - /** - * Prepare message for serialization. - * - * @param marsh Marshaller. - */ - // TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - super.prepareMarshal(marsh); - - if (msg instanceof Message) - serMsg = (Message)msg; - else { - if (msg != null) - msgBytes = U.marshal(marsh, msg); - } - } - - /** - * Finish deserialization. - * - * @param marsh Marshaller. - * @param ldr Class loader. - */ - // TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627 - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(marsh, ldr); - - if (msg != null) - return; - - if (serMsg != null) - msg = (DiscoverySpiCustomMessage)serMsg; - else { - if (msgBytes != null) - msg = U.unmarshal(marsh, msgBytes, ldr); - } - } - /** {@inheritDoc} */ @Override public boolean equals(Object obj) { return super.equals(obj) && obj instanceof TcpDiscoveryCustomEventMessage && - Objects.equals(((TcpDiscoveryCustomEventMessage)obj).verifierNodeId(), verifierNodeId()); + Objects.equals(((TcpDiscoveryAbstractMessage)obj).verifierNodeId(), verifierNodeId()); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java index a900dc423b204..8619509d5cd34 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/security/NodeSecurityContextPropagationTest.java @@ -53,7 +53,6 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; -import static org.apache.ignite.testframework.GridTestUtils.getFieldValue; import static org.apache.ignite.testframework.GridTestUtils.runAsync; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -185,7 +184,7 @@ private boolean anyReceivedMessageMatch(IgniteEx ignite, Predicate predi Object unwrappedMsg = msg; if (msg instanceof TcpDiscoveryCustomEventMessage) { - DiscoverySpiCustomMessage customMsg = getFieldValue(msg, "serMsg"); + DiscoverySpiCustomMessage customMsg = ((TcpDiscoveryCustomEventMessage)msg).message(); assert customMsg instanceof SecurityAwareCustomMessageWrapper; diff --git a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java index bda1aebcd2285..2032d0acd4feb 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/MessagesPluginProvider.java @@ -20,7 +20,10 @@ import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.CoreMessagesProvider; +import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.Marshallers; import org.apache.ignite.plugin.AbstractTestPluginProvider; import org.apache.ignite.plugin.ExtensionRegistry; import org.apache.ignite.plugin.PluginContext; @@ -81,10 +84,18 @@ public MessagesPluginProvider(Class... msgs) { /** */ private MessageSerializer loadSerializer(Class msgCls) { try { + boolean marsh = MarshallableMessage.class.isAssignableFrom(msgCls); + + String clsPref = msgCls.getSimpleName() + (marsh ? "Marshallable" : ""); + Class serCls = U.gridClassLoader() - .loadClass(msgCls.getPackage().getName() + "." + msgCls.getSimpleName() + "Serializer"); + .loadClass(msgCls.getPackage().getName() + "." + clsPref + "Serializer"); + + Object msgSer = marsh ? serCls.getConstructor(Marshaller.class, ClassLoader.class) + .newInstance(Marshallers.jdk(), U.gridClassLoader()) : + U.newInstance(serCls); - return (MessageSerializer)U.newInstance(serCls); + return (MessageSerializer)msgSer; } catch (Exception e) { throw new RuntimeException("Unable to find serializer for message: " + msgCls, e); diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java index b8a82d52cc90b..5f9ca341d76c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryUnmarshalVulnerabilityTest.java @@ -19,18 +19,16 @@ import java.io.BufferedOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.OutputStream; -import java.io.Serializable; import java.net.InetAddress; import java.net.Socket; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshallers; -import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.spi.MessagesPluginProvider; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -45,11 +43,8 @@ */ @WithSystemProperty(key = IGNITE_ENABLE_OBJECT_INPUT_FILTER_AUTOCONFIGURATION, value = "false") public class DiscoveryUnmarshalVulnerabilityTest extends GridCommonAbstractTest { - /** Marshaller. */ - private static final JdkMarshaller MARSH = Marshallers.jdk(); - /** Shared value. */ - private static final AtomicBoolean SHARED = new AtomicBoolean(); + static final AtomicBoolean SHARED = new AtomicBoolean(); /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { @@ -63,6 +58,12 @@ public class DiscoveryUnmarshalVulnerabilityTest extends GridCommonAbstractTest IgniteUtils.clearClassCache(); } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setPluginProviders(new MessagesPluginProvider(ExploitMessage.class)); + } + /** * @throws Exception If failed. */ @@ -139,7 +140,7 @@ private void testExploit(boolean positive) throws Exception { try { startGrid(); - attack(marshal(new Exploit())); + attack(Marshallers.jdk().marshal(new Exploit())); boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -157,13 +158,6 @@ private void testExploit(boolean positive) throws Exception { } } - /** - * @param obj Object. - */ - private static byte[] marshal(Object obj) throws IgniteCheckedException { - return MARSH.marshal(obj); - } - /** * @param data Data. */ @@ -175,18 +169,7 @@ private void attack(byte[] data) throws IOException { OutputStream oos = new BufferedOutputStream(sock.getOutputStream()) ) { oos.write(U.IGNITE_HEADER); - oos.write((byte)1); // Flag for java serialization. oos.write(data); } } - - /** */ - private static class Exploit implements Serializable { - /** - * @param is Input stream. - */ - private void readObject(ObjectInputStream is) throws ClassNotFoundException, IOException { - SHARED.set(true); - } - } } diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/Exploit.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/Exploit.java new file mode 100644 index 0000000000000..02279fd332033 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/Exploit.java @@ -0,0 +1,17 @@ +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; + +import static org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest.SHARED; + +/** */ +class Exploit implements Serializable { + /** + * @param is Input stream. + */ + private void readObject(ObjectInputStream is) throws ClassNotFoundException, IOException { + SHARED.set(true); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ExploitMessage.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ExploitMessage.java new file mode 100644 index 0000000000000..fa627d593a7fc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ExploitMessage.java @@ -0,0 +1,36 @@ +package org.apache.ignite.spi.discovery.tcp; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.MarshallableMessage; +import org.apache.ignite.internal.Order; +import org.apache.ignite.marshaller.Marshaller; + +import static org.apache.ignite.marshaller.Marshallers.jdk; + +/** */ +class ExploitMessage implements MarshallableMessage { + /** */ + @Order(0) + byte[] exploitBytes; + + /** */ + private Exploit exploit; + + /** */ + public ExploitMessage() {} + + /** */ + public ExploitMessage(Exploit exploit) { + this.exploit = exploit; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { + exploitBytes = jdk().marshal(exploit); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { + exploit = jdk().unmarshal(exploitBytes, clsLdr); + } +}