Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,20 @@
package org.apache.ignite.internal.managers.discovery;

import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.jetbrains.annotations.Nullable;

/** Custom message wrapper with ID of security subject that initiated the current message. */
public class SecurityAwareCustomMessageWrapper implements DiscoverySpiCustomMessage, MarshallableMessage {
public class SecurityAwareCustomMessageWrapper implements DiscoverySpiCustomMessage {
/** Security subject ID. */
@Order(0)
UUID secSubjId;

/** Original message. */
private DiscoveryCustomMessage delegate;

/** */
// TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627
@Order(1)
Message msg;

/** Serialized message bytes. */
// TODO: Should be removed in https://issues.apache.org/jira/browse/IGNITE-27627
@Order(2)
byte[] msgBytes;
DiscoveryCustomMessage delegate;

/** Default constructor for {@link MessageFactory}. */
public SecurityAwareCustomMessageWrapper() {
Expand All @@ -56,9 +42,6 @@ public SecurityAwareCustomMessageWrapper() {
public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate, UUID secSubjId) {
this.delegate = delegate;
this.secSubjId = secSubjId;

if (delegate instanceof Message)
msg = (Message)delegate;
}

/** Gets security Subject ID. */
Expand All @@ -80,7 +63,7 @@ public UUID securitySubjectId() {
* @return Delegate.
*/
public DiscoveryCustomMessage delegate() {
return msg != null ? (DiscoveryCustomMessage)msg : delegate;
return delegate;
}

/** {@inheritDoc} */
Expand All @@ -89,16 +72,4 @@ public DiscoveryCustomMessage delegate() {

return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack, secSubjId);
}

/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
if (!(delegate instanceof Message))
msgBytes = U.marshal(marsh, delegate);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
if (msgBytes != null)
delegate = U.unmarshal(marsh, msgBytes, clsLdr);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -500,35 +500,28 @@ else if (state == DISCONNECTED) {
if (state == STOPPED || state == SEGMENTED || state == STARTING)
throw new IgniteException("Failed to send custom message: client is " + state.name().toLowerCase() + ".");

try {
TcpDiscoveryCustomEventMessage msg;

DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
TcpDiscoveryCustomEventMessage msg;

if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
else
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);

Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
() -> locNode.consistentId().toString())
.addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName())
.addLog(() -> "Created");
if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
else
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);

// This root span will be parent both from local and remote nodes.
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
() -> locNode.consistentId().toString())
.addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName())
.addLog(() -> "Created");

msg.prepareMarshal(spi.marshaller());
// This root span will be parent both from local and remote nodes.
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));

sockWriter.sendMessage(msg);
sockWriter.sendMessage(msg);

rootSpan.addLog(() -> "Sent").end();
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
}
rootSpan.addLog(() -> "Sent").end();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -2595,17 +2588,8 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);

if (node != null && node.visible()) {
try {
msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));

DiscoverySpiCustomMessage msgObj = msg.message();

notifyDiscovery(
EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msgObj, msg.spanContainer());
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
}
notifyDiscovery(
EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allVisibleNodes(), msg.message(), msg.spanContainer());
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1017,35 +1017,28 @@ private void interruptPing(TcpDiscoveryNode node) {

/** {@inheritDoc} */
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
try {
TcpDiscoveryCustomEventMessage msg;
TcpDiscoveryCustomEventMessage msg;

DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);
DiscoverySpiCustomMessage customMsg = U.unwrapCustomMessage(evt);

if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
else
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);
if (customMsg instanceof DiscoveryServerOnlyCustomMessage)
msg = new TcpDiscoveryServerOnlyCustomEventMessage(getLocalNodeId(), evt);
else
msg = new TcpDiscoveryCustomEventMessage(getLocalNodeId(), evt);

Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
() -> locNode.consistentId().toString())
.addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName())
.addLog(() -> "Created");
Span rootSpan = tracing.create(TraceableMessagesTable.traceName(msg.getClass()))
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> getLocalNodeId().toString())
.addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.CONSISTENT_ID),
() -> locNode.consistentId().toString())
.addTag(SpanTags.MESSAGE_CLASS, () -> customMsg.getClass().getSimpleName())
.addLog(() -> "Created");

// This root span will be parent both from local and remote nodes.
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));
// This root span will be parent both from local and remote nodes.
msg.spanContainer().serializedSpanBytes(tracing.serialize(rootSpan));

msg.prepareMarshal(spi.marshaller());

msgWorker.addMessage(msg);
msgWorker.addMessage(msg);

rootSpan.addLog(() -> "Sent").end();
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom event: " + evt, e);
}
rootSpan.addLog(() -> "Sent").end();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -2556,9 +2549,6 @@ else if (msg instanceof TcpDiscoveryCustomEventMessage) {
TcpDiscoveryCustomEventMessage msg0 = (TcpDiscoveryCustomEventMessage)msg;

msg = new TcpDiscoveryCustomEventMessage(msg0);

// We shoulgn't store deserialized message in the queue because of msg is transient.
((TcpDiscoveryCustomEventMessage)msg).clearMessage();
}

synchronized (msgs) {
Expand Down Expand Up @@ -6106,40 +6096,22 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
processCustomMessage(msg, waitForNotification);
}
}

msg.clearMessage();
}
else {
addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id(), true));

DiscoverySpiCustomMessage msgObj = null;
DiscoverySpiCustomMessage customMsg = msg.message();

try {
msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));

msgObj = msg.message();
}
catch (Throwable e) {
U.error(log, "Failed to unmarshal discovery custom message.", e);
}

if (msgObj != null) {
DiscoverySpiCustomMessage nextMsg = msgObj.ackMessage();
if (customMsg != null) {
DiscoverySpiCustomMessage nextMsg = customMsg.ackMessage();

if (nextMsg != null) {
try {
TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
getLocalNodeId(), nextMsg);

ackMsg.topologyVersion(msg.topologyVersion());
TcpDiscoveryCustomEventMessage ackMsg = new TcpDiscoveryCustomEventMessage(
getLocalNodeId(), nextMsg);

ackMsg.prepareMarshal(spi.marshaller());
ackMsg.topologyVersion(msg.topologyVersion());

processCustomMessage(ackMsg, waitForNotification);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to marshal discovery custom message.", e);
}
processCustomMessage(ackMsg, waitForNotification);
}
}
}
Expand All @@ -6165,8 +6137,6 @@ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean wa
notifyDiscoveryListener(msg, waitForNotification);
}

msg.clearMessage();

if (sendMessageToRemotes(msg))
sendMessageAcrossRing(msg);
}
Expand Down Expand Up @@ -6303,16 +6273,7 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
if (node == null)
return;

DiscoverySpiCustomMessage msgObj;

try {
msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration()));

msgObj = msg.message();
}
catch (Throwable t) {
throw new IgniteException("Failed to unmarshal discovery custom message: " + msg, t);
}
DiscoverySpiCustomMessage customMsg = msg.message();

IgniteFuture<?> fut = lsnr.onDiscovery(
new DiscoveryNotification(
Expand All @@ -6321,13 +6282,13 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
node,
snapshot,
hist,
msgObj,
customMsg,
msg.spanContainer())
);

notifiedDiscovery.set(true);

if (waitForNotification || msgObj.isMutable()) {
if (waitForNotification || customMsg.isMutable()) {
blockingSectionBegin();

try {
Expand All @@ -6337,15 +6298,6 @@ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean
blockingSectionEnd();
}
}

if (msgObj.isMutable()) {
try {
msg.prepareMarshal(spi.marshaller());
}
catch (Throwable t) {
throw new IgniteException("Failed to marshal mutable discovery message: " + msgObj, t);
}
}
}
}

Expand Down
Loading
Loading