Skip to content

Commit be11412

Browse files
committed
IGNITE-28520 Move prepareMarshal / finishUnmarshal out of NIO communication thread
1 parent 913d638 commit be11412

File tree

4 files changed

+327
-1
lines changed

4 files changed

+327
-1
lines changed

modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ public class MessageSerializerGenerator {
114114
/** The marshallable message type. */
115115
private final TypeMirror marshallableMsgType;
116116

117+
/** Collection of lines for the {@code prepareMarshalCacheObjects} method. Empty when the message has no such fields. */
118+
private final List<String> prepareCacheObjects = new ArrayList<>();
119+
120+
/** Collection of lines for the {@code finishUnmarshalCacheObjects} method. Empty when the message has no such fields. */
121+
private final List<String> finishCacheObjects = new ArrayList<>();
122+
117123
/** */
118124
private int indent;
119125

@@ -189,6 +195,20 @@ private String generateSerializerCode(String serClsName) throws IOException {
189195

190196
writer.write(TAB + "}" + NL);
191197

198+
// Write #prepareMarshalCacheObjects / #finishUnmarshalCacheObjects overrides if the message has any
199+
// cache-object-bearing @Order fields (IGNITE-28520).
200+
if (!prepareCacheObjects.isEmpty()) {
201+
writer.write(NL);
202+
203+
for (String p: prepareCacheObjects)
204+
writer.write(p + NL);
205+
206+
writer.write(NL);
207+
208+
for (String f: finishCacheObjects)
209+
writer.write(f + NL);
210+
}
211+
192212
writer.write("}");
193213

194214
writer.write(NL);
@@ -242,6 +262,178 @@ private void generateMethods(List<VariableElement> fields) throws Exception {
242262

243263
finish(write, false, false);
244264
finish(read, true, marshallableMessage());
265+
266+
generateCacheObjectMethods(fields);
267+
}
268+
269+
/**
270+
* Generates bodies of {@code prepareMarshalCacheObjects} / {@code finishUnmarshalCacheObjects} overrides when the
271+
* message carries at least one {@code @Order}-annotated field whose declared type is {@code CacheObject},
272+
* {@code KeyCacheObject}, a {@code Collection} of them, or an array of them. Traversal is single-level: it does
273+
* not recurse into nested messages or enter map values. See IGNITE-28520.
274+
*/
275+
private void generateCacheObjectMethods(List<VariableElement> fields) throws Exception {
276+
List<VariableElement> cacheObjFields = new ArrayList<>();
277+
278+
for (VariableElement field: fields) {
279+
if (isCacheObjectBearingField(field.asType()))
280+
cacheObjFields.add(field);
281+
}
282+
283+
if (cacheObjFields.isEmpty())
284+
return;
285+
286+
imports.add("org.apache.ignite.IgniteCheckedException");
287+
imports.add("org.apache.ignite.internal.processors.cache.CacheObjectValueContext");
288+
289+
startCacheObjectMethod(prepareCacheObjects, true);
290+
startCacheObjectMethod(finishCacheObjects, false);
291+
292+
indent++;
293+
294+
for (VariableElement field: cacheObjFields) {
295+
emitCacheObjectCall(prepareCacheObjects, field, true);
296+
emitCacheObjectCall(finishCacheObjects, field, false);
297+
}
298+
299+
indent--;
300+
301+
prepareCacheObjects.add(identedLine("}"));
302+
finishCacheObjects.add(identedLine("}"));
303+
}
304+
305+
/**
306+
* @return {@code true} if the given field type is a {@code CacheObject}/{@code KeyCacheObject}, a collection of
307+
* them, or an array of them. Maps and nested messages are intentionally excluded — see IGNITE-28520 scope.
308+
*/
309+
private boolean isCacheObjectBearingField(TypeMirror type) {
310+
if (type.getKind() == TypeKind.ARRAY) {
311+
TypeMirror comp = ((ArrayType)type).getComponentType();
312+
313+
return comp.getKind() == TypeKind.DECLARED && isCacheObjectType(comp);
314+
}
315+
316+
if (type.getKind() != TypeKind.DECLARED)
317+
return false;
318+
319+
if (isCacheObjectType(type))
320+
return true;
321+
322+
// Map is intentionally skipped even though Map is Collection-unrelated — exclude it explicitly for clarity.
323+
if (assignableFrom(erasedType(type), type(Map.class.getName())))
324+
return false;
325+
326+
if (assignableFrom(erasedType(type), type(Collection.class.getName()))) {
327+
List<? extends TypeMirror> typeArgs = ((DeclaredType)type).getTypeArguments();
328+
329+
if (typeArgs.size() == 1) {
330+
TypeMirror arg = typeArgs.get(0);
331+
332+
return arg.getKind() == TypeKind.DECLARED && isCacheObjectType(arg);
333+
}
334+
}
335+
336+
return false;
337+
}
338+
339+
/** @return {@code true} if {@code type} is assignable to {@code CacheObject} (this also covers {@code KeyCacheObject}). */
340+
private boolean isCacheObjectType(TypeMirror type) {
341+
return assignableFrom(type, type("org.apache.ignite.internal.processors.cache.CacheObject"));
342+
}
343+
344+
/** Emits method signature and opening brace for cache-object marshalling methods. */
345+
private void startCacheObjectMethod(List<String> code, boolean prepare) {
346+
indent = 1;
347+
348+
code.add(identedLine(METHOD_JAVADOC));
349+
350+
if (prepare) {
351+
code.add(identedLine(
352+
"@Override public void prepareMarshalCacheObjects(" + type.getSimpleName() +
353+
" msg, CacheObjectValueContext ctx) throws IgniteCheckedException {"));
354+
}
355+
else {
356+
code.add(identedLine(
357+
"@Override public void finishUnmarshalCacheObjects(" + type.getSimpleName() +
358+
" msg, CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException {"));
359+
}
360+
}
361+
362+
/**
363+
* Emits a single field traversal statement — a guarded {@code prepareMarshal} / {@code finishUnmarshal} call
364+
* for a direct {@code CacheObject} field, or a null-safe loop for a collection / array of them.
365+
*/
366+
private void emitCacheObjectCall(List<String> code, VariableElement field, boolean prepare) {
367+
String mtd = prepare ? "prepareMarshal(ctx)" : "finishUnmarshal(ctx, ldr)";
368+
369+
String accessor = fieldAccessor(field);
370+
371+
TypeMirror type = field.asType();
372+
373+
if (type.getKind() == TypeKind.DECLARED && isCacheObjectType(type)) {
374+
code.add(identedLine("if (%s != null)", accessor));
375+
376+
indent++;
377+
378+
code.add(identedLine("%s.%s;", accessor, mtd));
379+
380+
indent--;
381+
}
382+
else {
383+
// Collection or array of CacheObject — iterate with a null-check on both the container and the element.
384+
code.add(identedLine("if (%s != null) {", accessor));
385+
386+
indent++;
387+
388+
String elementType = "org.apache.ignite.internal.processors.cache.KeyCacheObject";
389+
390+
if (type.getKind() == TypeKind.ARRAY) {
391+
TypeMirror comp = ((ArrayType)type).getComponentType();
392+
393+
if (!assignableFrom(comp, type(elementType)))
394+
elementType = "org.apache.ignite.internal.processors.cache.CacheObject";
395+
}
396+
else {
397+
TypeMirror arg = ((DeclaredType)type).getTypeArguments().get(0);
398+
399+
if (!assignableFrom(arg, type(elementType)))
400+
elementType = "org.apache.ignite.internal.processors.cache.CacheObject";
401+
}
402+
403+
imports.add(elementType);
404+
405+
String simpleElement = elementType.substring(elementType.lastIndexOf('.') + 1);
406+
407+
code.add(identedLine("for (%s obj : %s) {", simpleElement, accessor));
408+
409+
indent++;
410+
411+
code.add(identedLine("if (obj != null)"));
412+
413+
indent++;
414+
415+
code.add(identedLine("obj.%s;", mtd));
416+
417+
indent--;
418+
419+
indent--;
420+
421+
code.add(identedLine("}"));
422+
423+
indent--;
424+
425+
code.add(identedLine("}"));
426+
}
427+
}
428+
429+
/** Returns the field access expression, taking superclass-field access into account. */
430+
private String fieldAccessor(VariableElement field) {
431+
String name = field.getSimpleName().toString();
432+
433+
if (type.equals(field.getEnclosingElement()))
434+
return "msg." + name;
435+
436+
return "((" + field.getEnclosingElement().getSimpleName() + ")msg)." + name;
245437
}
246438

247439
/**

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
4242
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
4343
import org.apache.ignite.internal.managers.communication.GridMessageListener;
44+
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
4445
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
4546
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
4647
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
@@ -1150,13 +1151,81 @@ private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws I
11501151
if (destNodeId == null || !cctx.localNodeId().equals(destNodeId)) {
11511152
msg.prepareMarshal(cctx);
11521153

1154+
prepareMarshalGeneratedCacheObjects(msg);
1155+
11531156
if (msg instanceof GridCacheDeployable && msg.addDeploymentInfo())
11541157
cctx.deploy().prepare((GridCacheDeployable)msg);
11551158
}
11561159

11571160
return true;
11581161
}
11591162

1163+
/**
1164+
* Invokes {@link MessageSerializer#prepareMarshalCacheObjects} on the user thread for the given message, as part
1165+
* of the IGNITE-28520 Phase 1 two-phase marshalling. The generated serializer method walks {@code @Order}-annotated
1166+
* {@link CacheObject}/{@link org.apache.ignite.internal.processors.cache.KeyCacheObject} fields of {@code msg}
1167+
* (including single-level collections and arrays) and calls
1168+
* {@link CacheObject#prepareMarshal(CacheObjectValueContext)} on each, so that the NIO communication thread never
1169+
* has to do this work.
1170+
*
1171+
* <p>This call is intentionally <b>additive</b> to the existing
1172+
* {@link GridCacheMessage#prepareMarshal(GridCacheSharedContext)} override path: it runs right after the manual
1173+
* override. For messages whose override has already populated {@code valBytes} of every cache object field, this
1174+
* call becomes a no-op thanks to the idempotency guard inside
1175+
* {@link org.apache.ignite.internal.processors.cache.CacheObjectImpl#prepareMarshal(CacheObjectValueContext)}
1176+
* ({@code if (valBytes == null)}). For messages that don't override {@code prepareMarshal(GridCacheSharedContext)}
1177+
* but do carry cache-object fields, the generated method is what actually does the marshalling work.
1178+
*
1179+
* <p>Only single-cache messages ({@link GridCacheIdMessage}) are processed here — cross-cache messages resolve
1180+
* their per-entry cache contexts internally via their hand-written overrides, and we don't try to second-guess
1181+
* them from a single shared context.
1182+
*
1183+
* @param msg Message being sent.
1184+
* @throws IgniteCheckedException If the generated serializer fails to marshal a cache object field.
1185+
*/
1186+
@SuppressWarnings({"unchecked", "rawtypes"})
1187+
private void prepareMarshalGeneratedCacheObjects(GridCacheMessage msg) throws IgniteCheckedException {
1188+
if (!(msg instanceof GridCacheIdMessage))
1189+
return;
1190+
1191+
CacheObjectContext cacheObjCtx = cctx.cacheObjectContext(((GridCacheIdMessage)msg).cacheId());
1192+
1193+
if (cacheObjCtx == null)
1194+
return;
1195+
1196+
MessageSerializer ser = cctx.gridIO().messageFactory().serializer(msg.directType());
1197+
1198+
if (ser != null)
1199+
ser.prepareMarshalCacheObjects(msg, cacheObjCtx);
1200+
}
1201+
1202+
/**
1203+
* Invokes {@link MessageSerializer#finishUnmarshalCacheObjects} on the user (listener-dispatch) thread — the
1204+
* receive-side mirror of {@link #prepareMarshalGeneratedCacheObjects}. See that method's docs for the full
1205+
* contract. This call is also additive to the manual
1206+
* {@link GridCacheMessage#finishUnmarshal(GridCacheSharedContext, ClassLoader)} override path and idempotent for
1207+
* cache objects whose values have already been deserialized.
1208+
*
1209+
* @param msg Message being received.
1210+
* @param ldr Class loader used for value deserialization.
1211+
* @throws IgniteCheckedException If the generated serializer fails to unmarshal a cache object field.
1212+
*/
1213+
@SuppressWarnings({"unchecked", "rawtypes"})
1214+
private void finishUnmarshalGeneratedCacheObjects(GridCacheMessage msg, ClassLoader ldr) throws IgniteCheckedException {
1215+
if (!(msg instanceof GridCacheIdMessage))
1216+
return;
1217+
1218+
CacheObjectContext cacheObjCtx = cctx.cacheObjectContext(((GridCacheIdMessage)msg).cacheId());
1219+
1220+
if (cacheObjCtx == null)
1221+
return;
1222+
1223+
MessageSerializer ser = cctx.gridIO().messageFactory().serializer(msg.directType());
1224+
1225+
if (ser != null)
1226+
ser.finishUnmarshalCacheObjects(msg, cacheObjCtx, ldr);
1227+
}
1228+
11601229
/**
11611230
* @param nodeId Node ID.
11621231
* @param sndErr Send error.
@@ -1604,7 +1673,11 @@ private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) {
16041673
log.debug("Set P2P context [senderId=" + nodeId + ", msg=" + cacheMsg + ']');
16051674
}
16061675

1607-
cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
1676+
ClassLoader ldr = cctx.deploy().globalLoader();
1677+
1678+
cacheMsg.finishUnmarshal(cctx, ldr);
1679+
1680+
finishUnmarshalGeneratedCacheObjects(cacheMsg, ldr);
16081681
}
16091682
catch (IgniteCheckedException e) {
16101683
cacheMsg.onClassError(e);

modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageSerializer.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.ignite.plugin.extensions.communication;
1919

20+
import org.apache.ignite.IgniteCheckedException;
21+
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
22+
2023
/**
2124
* Interface for message serialization logic.
2225
*/
@@ -38,4 +41,44 @@ public interface MessageSerializer<M extends Message> {
3841
* @return Whether message was fully read.
3942
*/
4043
public boolean readFrom(M msg, MessageReader reader);
44+
45+
/**
46+
* Prepares {@link org.apache.ignite.internal.processors.cache.CacheObject} fields of the given message for
47+
* marshalling. This is the first phase of the two-phase marshalling introduced by IGNITE-28520: it is expected
48+
* to be invoked on a <b>user thread</b> before the message is handed off to the NIO worker, so that
49+
* {@link org.apache.ignite.internal.processors.cache.CacheObject#prepareMarshal(CacheObjectValueContext)} never
50+
* runs on the NIO communication thread (or, worse, the Discovery thread where it could deadlock).
51+
*
52+
* <p>The default implementation is a no-op. The generated serializer overrides it for messages that contain
53+
* at least one {@code @Order}-annotated field whose declared type is {@code CacheObject} / {@code KeyCacheObject}
54+
* (including {@code Collection<CacheObject>} and {@code CacheObject[]}). Only the top level is traversed — the
55+
* traversal does <b>not</b> recurse into nested messages or map values.
56+
*
57+
* @param msg Message instance.
58+
* @param ctx Cache object value context.
59+
* @throws IgniteCheckedException If marshalling of any contained cache object fails.
60+
*/
61+
public default void prepareMarshalCacheObjects(M msg, CacheObjectValueContext ctx) throws IgniteCheckedException {
62+
// No-op by default.
63+
}
64+
65+
/**
66+
* Finishes unmarshalling of {@link org.apache.ignite.internal.processors.cache.CacheObject} fields of the given
67+
* message. Intended to be invoked on a <b>user thread</b> (for example, on a listener dispatch thread) after
68+
* {@link #readFrom(Message, MessageReader)} returns {@code true} on the NIO worker, so that
69+
* {@link org.apache.ignite.internal.processors.cache.CacheObject#finishUnmarshal(CacheObjectValueContext, ClassLoader)}
70+
* never runs on the NIO communication thread.
71+
*
72+
* <p>The default implementation is a no-op. Mirrors the traversal rules of
73+
* {@link #prepareMarshalCacheObjects(Message, CacheObjectValueContext)}.
74+
*
75+
* @param msg Message instance.
76+
* @param ctx Cache object value context.
77+
* @param ldr Class loader to use for value deserialization.
78+
* @throws IgniteCheckedException If unmarshalling of any contained cache object fails.
79+
*/
80+
public default void finishUnmarshalCacheObjects(M msg, CacheObjectValueContext ctx, ClassLoader ldr)
81+
throws IgniteCheckedException {
82+
// No-op by default.
83+
}
4184
}

0 commit comments

Comments
 (0)