diff --git a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java index 893a1ccaa4878..4cfd55f338c57 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java @@ -38,6 +38,7 @@ import javax.annotation.processing.ProcessingEnvironment; import javax.lang.model.element.Element; import javax.lang.model.element.ElementKind; +import javax.lang.model.element.Modifier; import javax.lang.model.element.QualifiedNameable; import javax.lang.model.element.TypeElement; import javax.lang.model.element.VariableElement; @@ -114,6 +115,15 @@ public class MessageSerializerGenerator { /** The marshallable message type. */ private final TypeMirror marshallableMsgType; + /** */ + private final List prepareCacheObjects = new ArrayList<>(); + + /** */ + private final List finishCacheObjects = new ArrayList<>(); + + /** Map of {@code } for recursion into nested messages. */ + private final LinkedHashMap nestedSerializerFields = new LinkedHashMap<>(); + /** */ private int indent; @@ -189,6 +199,18 @@ private String generateSerializerCode(String serClsName) throws IOException { writer.write(TAB + "}" + NL); + if (!prepareCacheObjects.isEmpty()) { + writer.write(NL); + + for (String p: prepareCacheObjects) + writer.write(p + NL); + + writer.write(NL); + + for (String f: finishCacheObjects) + writer.write(f + NL); + } + writer.write("}"); writer.write(NL); @@ -242,6 +264,485 @@ private void generateMethods(List fields) throws Exception { finish(write, false, false); finish(read, true, marshallableMessage()); + + generateCacheObjectMethods(fields); + } + + /** IGNITE-28520 Phase 1: emits per-field CacheObject marshal hooks with recursion into nested {@code Message} types. */ + private void generateCacheObjectMethods(List orderedFields) throws Exception { + List traversable = new ArrayList<>(); + + for (VariableElement field: orderedFields) { + if (classify(field.asType()) != FieldKind.SKIP) + traversable.add(field); + } + + if (traversable.isEmpty()) + return; + + imports.add("org.apache.ignite.IgniteCheckedException"); + imports.add("org.apache.ignite.internal.processors.cache.CacheObjectValueContext"); + + startCacheObjectMethod(prepareCacheObjects, true); + startCacheObjectMethod(finishCacheObjects, false); + + indent++; + + for (VariableElement field: traversable) { + emitCacheObjectCall(prepareCacheObjects, field, true); + emitCacheObjectCall(finishCacheObjects, field, false); + } + + indent--; + + prepareCacheObjects.add(identedLine("}")); + finishCacheObjects.add(identedLine("}")); + + for (Map.Entry e : nestedSerializerFields.entrySet()) { + imports.add(e.getValue()); + + String serSimple = e.getValue().substring(e.getValue().lastIndexOf('.') + 1); + + fields.add("private final static " + serSimple + " " + e.getKey() + " = new " + serSimple + "();"); + } + } + + /** Traversal kinds for {@link #generateCacheObjectMethods}. */ + private enum FieldKind { + /** {@code CacheObject} / {@code KeyCacheObject} scalar. */ + CO, + /** {@code Collection} / {@code Collection}. */ + CO_COLL, + /** {@code CacheObject[]} / {@code KeyCacheObject[]}. */ + CO_ARR, + /** Nested concrete {@code Message}. */ + MSG, + /** {@code Collection} with concrete element type. */ + MSG_COLL, + /** {@code Message[]} with concrete component type. */ + MSG_ARR, + /** + * {@code Map} where at least one of {@code K}, {@code V} is a {@code CacheObject} or a recursable + * {@code Message}. Each traversable side is walked via {@link + * org.apache.ignite.internal.direct.stream.PendingMap#keysOf(Map)} / {@code valuesOf(Map)} so that + * receive-side {@code PendingMap}s are not materialized before the cache-object hook has run. + */ + MAP, + /** Skipped — abstract Message, cross-cache nested Message, Map with no traversable side, unsupported. */ + SKIP + } + + /** */ + private FieldKind classify(TypeMirror t) { + if (t.getKind() == TypeKind.ARRAY) { + TypeMirror comp = ((ArrayType)t).getComponentType(); + + if (comp.getKind() != TypeKind.DECLARED) + return FieldKind.SKIP; + + if (isCacheObjectType(comp)) + return FieldKind.CO_ARR; + + return isRecursableMessage(comp) ? FieldKind.MSG_ARR : FieldKind.SKIP; + } + + if (t.getKind() != TypeKind.DECLARED) + return FieldKind.SKIP; + + if (isCacheObjectType(t)) + return FieldKind.CO; + + if (assignableFrom(erasedType(t), type(Map.class.getName()))) { + List args = ((DeclaredType)t).getTypeArguments(); + + if (args.size() != 2) + return FieldKind.SKIP; + + FieldKind kSide = classifyMapSide(args.get(0)); + FieldKind vSide = classifyMapSide(args.get(1)); + + // No traversable key or value — nothing to do for CacheObject hooks. + if (kSide == FieldKind.SKIP && vSide == FieldKind.SKIP) + return FieldKind.SKIP; + + return FieldKind.MAP; + } + + if (assignableFrom(erasedType(t), type(Collection.class.getName()))) { + List args = ((DeclaredType)t).getTypeArguments(); + + if (args.size() != 1) + return FieldKind.SKIP; + + TypeMirror arg = args.get(0); + + if (arg.getKind() != TypeKind.DECLARED) + return FieldKind.SKIP; + + if (isCacheObjectType(arg)) + return FieldKind.CO_COLL; + + return isRecursableMessage(arg) ? FieldKind.MSG_COLL : FieldKind.SKIP; + } + + return isRecursableMessage(t) ? FieldKind.MSG : FieldKind.SKIP; + } + + /** Classifies one side (key or value) of a {@code Map} field: {@link FieldKind#CO}, {@link FieldKind#MSG}, or + * {@link FieldKind#SKIP}. */ + private FieldKind classifyMapSide(TypeMirror t) { + if (t.getKind() != TypeKind.DECLARED) + return FieldKind.SKIP; + + if (isCacheObjectType(t)) + return FieldKind.CO; + + return isRecursableMessage(t) ? FieldKind.MSG : FieldKind.SKIP; + } + + /** */ + private boolean isCacheObjectType(TypeMirror type) { + return assignableFrom(type, type("org.apache.ignite.internal.processors.cache.CacheObject")); + } + + /** + * True if {@code t} is a concrete non-abstract {@code Message} that is safe to recurse into from a parent's + * generated serializer: excludes the {@code Message} interface itself, abstract classes, self-reference, + * {@code MarshallableMessage} (its generated serializer has a non-default constructor), and nested messages + * that own an {@code @Order int cacheId} (cross-cache — their CO ctx must be resolved externally). + */ + private boolean isRecursableMessage(TypeMirror t) { + TypeMirror msgIface = type(MESSAGE_INTERFACE); + + if (msgIface == null || !assignableFrom(t, msgIface)) + return false; + + if (assignableFrom(t, marshallableMsgType)) + return false; + + Element el = env.getTypeUtils().asElement(t); + + if (!(el instanceof TypeElement)) + return false; + + TypeElement te = (TypeElement)el; + + if (te.getKind() != ElementKind.CLASS) + return false; + + if (te.getModifiers().contains(Modifier.ABSTRACT)) + return false; + + if (te.equals(type)) + return false; + + return !hasOwnCacheIdOrder(te); + } + + /** True if {@code te} declares an {@code @Order}-annotated {@code int cacheId} field (anywhere in its hierarchy). */ + private boolean hasOwnCacheIdOrder(TypeElement te) { + for (TypeElement cur = te; cur != null; ) { + for (Element enc : cur.getEnclosedElements()) { + if (enc.getKind() != ElementKind.FIELD) + continue; + + if (enc.getAnnotation(Order.class) == null) + continue; + + if (!"cacheId".contentEquals(enc.getSimpleName())) + continue; + + TypeMirror ft = enc.asType(); + + if (ft.getKind() == TypeKind.INT) + return true; + } + + Element sup = env.getTypeUtils().asElement(cur.getSuperclass()); + + cur = sup instanceof TypeElement ? (TypeElement)sup : null; + } + + return false; + } + + /** */ + private void startCacheObjectMethod(List code, boolean prepare) { + indent = 1; + + code.add(identedLine(METHOD_JAVADOC)); + + if (prepare) { + code.add(identedLine( + "@Override public void prepareMarshalCacheObjects(" + type.getSimpleName() + + " msg, CacheObjectValueContext ctx) throws IgniteCheckedException {")); + } + else { + code.add(identedLine( + "@Override public void finishUnmarshalCacheObjects(" + type.getSimpleName() + + " msg, CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException {")); + } + } + + /** */ + private void emitCacheObjectCall(List code, VariableElement field, boolean prepare) { + String accessor = fieldAccessor(field); + TypeMirror t = field.asType(); + FieldKind kind = classify(t); + + switch (kind) { + case CO: + emitCoDirect(code, accessor, prepare); + break; + + case CO_COLL: + case CO_ARR: + emitCoIterable(code, accessor, t, kind == FieldKind.CO_ARR, prepare); + break; + + case MSG: + emitMsgDirect(code, accessor, (DeclaredType)t, prepare); + break; + + case MSG_COLL: + emitMsgIterable(code, accessor, (DeclaredType)((DeclaredType)t).getTypeArguments().get(0), prepare); + break; + + case MSG_ARR: + emitMsgIterable(code, accessor, (DeclaredType)((ArrayType)t).getComponentType(), prepare); + break; + + case MAP: + emitMapTraversal(code, accessor, (DeclaredType)t, prepare); + break; + + default: + throw new IllegalStateException("Unexpected kind: " + kind); + } + } + + /** + * Emits traversal code for a {@code Map} field. One or both sides may be a {@code CacheObject} or a + * recursable {@code Message}; each traversable side is walked independently using {@link + * org.apache.ignite.internal.direct.stream.PendingMap#keysOf(Map)} / {@code valuesOf(Map)}. The helper + * dispatches between the receive-side {@code PendingMap} (iterates staged entries without materialization) + * and the send-side real map (delegates to {@code keySet()} / {@code values()}). + */ + private void emitMapTraversal(List code, String accessor, DeclaredType mapType, boolean prepare) { + List args = mapType.getTypeArguments(); + + TypeMirror keyT = args.get(0); + TypeMirror valT = args.get(1); + + FieldKind kSide = classifyMapSide(keyT); + FieldKind vSide = classifyMapSide(valT); + + imports.add("org.apache.ignite.internal.direct.stream.PendingMap"); + + code.add(identedLine("if (%s != null) {", accessor)); + + indent++; + + if (kSide != FieldKind.SKIP) + emitMapSideIteration(code, accessor, keyT, kSide, true, prepare); + + if (vSide != FieldKind.SKIP) + emitMapSideIteration(code, accessor, valT, vSide, false, prepare); + + indent--; + + code.add(identedLine("}")); + } + + /** + * Emits an iteration block over keys or values of a Map field, calling the appropriate CacheObject or + * nested-Message marshal hook on each element. + * + * @param code Target code buffer. + * @param accessor Field accessor expression (e.g. {@code msg.myMap}). + * @param sideType Declared type of the map side being traversed (key type or value type). + * @param sideKind {@link FieldKind#CO} or {@link FieldKind#MSG}. + * @param isKey {@code true} to iterate keys via {@code PendingMap.keysOf}, {@code false} for values. + * @param prepare {@code true} for {@code prepareMarshalCacheObjects}, {@code false} for + * {@code finishUnmarshalCacheObjects}. + */ + private void emitMapSideIteration( + List code, + String accessor, + TypeMirror sideType, + FieldKind sideKind, + boolean isKey, + boolean prepare + ) { + String helper = isKey ? "PendingMap.keysOf" : "PendingMap.valuesOf"; + String var = isKey ? "k" : "v"; + + if (sideKind == FieldKind.CO) { + String elementType = "org.apache.ignite.internal.processors.cache.KeyCacheObject"; + + if (!assignableFrom(sideType, type(elementType))) + elementType = "org.apache.ignite.internal.processors.cache.CacheObject"; + + imports.add(elementType); + + String simple = elementType.substring(elementType.lastIndexOf('.') + 1); + String mtd = prepare ? "prepareMarshal(ctx)" : "finishUnmarshal(ctx, ldr)"; + + code.add(identedLine("for (%s %s : %s(%s))", simple, var, helper, accessor)); + + indent++; + + code.add(identedLine("%s.%s;", var, mtd)); + + indent--; + } + else { + assert sideKind == FieldKind.MSG : sideKind; + + DeclaredType msgType = (DeclaredType)sideType; + String serRef = nestedSerializerRef(msgType); + String elemSimple = ((TypeElement)msgType.asElement()).getSimpleName().toString(); + + imports.add(((TypeElement)msgType.asElement()).getQualifiedName().toString()); + + String call = prepare ? "prepareMarshalCacheObjects(%s, ctx)" : "finishUnmarshalCacheObjects(%s, ctx, ldr)"; + + code.add(identedLine("for (%s %s : %s(%s))", elemSimple, var, helper, accessor)); + + indent++; + + code.add(identedLine("%s." + call + ";", serRef, var)); + + indent--; + } + } + + /** */ + private void emitCoDirect(List code, String accessor, boolean prepare) { + String mtd = prepare ? "prepareMarshal(ctx)" : "finishUnmarshal(ctx, ldr)"; + + code.add(identedLine("if (%s != null)", accessor)); + + indent++; + + code.add(identedLine("%s.%s;", accessor, mtd)); + + indent--; + } + + /** */ + private void emitCoIterable(List code, String accessor, TypeMirror t, boolean isArr, boolean prepare) { + String mtd = prepare ? "prepareMarshal(ctx)" : "finishUnmarshal(ctx, ldr)"; + + String elementType = "org.apache.ignite.internal.processors.cache.KeyCacheObject"; + + TypeMirror elem = isArr ? ((ArrayType)t).getComponentType() : ((DeclaredType)t).getTypeArguments().get(0); + + if (!assignableFrom(elem, type(elementType))) + elementType = "org.apache.ignite.internal.processors.cache.CacheObject"; + + imports.add(elementType); + + String simple = elementType.substring(elementType.lastIndexOf('.') + 1); + + code.add(identedLine("if (%s != null) {", accessor)); + + indent++; + + code.add(identedLine("for (%s obj : %s)", simple, accessor)); + + indent++; + + code.add(identedLine("obj.%s;", mtd)); + + indent -= 2; + + code.add(identedLine("}")); + } + + /** */ + private void emitMsgDirect(List code, String accessor, DeclaredType msgType, boolean prepare) { + String serRef = nestedSerializerRef(msgType); + + String call = prepare ? "prepareMarshalCacheObjects(%s, ctx)" : "finishUnmarshalCacheObjects(%s, ctx, ldr)"; + + code.add(identedLine("if (%s != null)", accessor)); + + indent++; + + code.add(identedLine("%s." + call + ";", serRef, accessor)); + + indent--; + } + + /** */ + private void emitMsgIterable(List code, String accessor, DeclaredType elemType, boolean prepare) { + String serRef = nestedSerializerRef(elemType); + + String elemSimple = ((TypeElement)elemType.asElement()).getSimpleName().toString(); + + imports.add(((TypeElement)elemType.asElement()).getQualifiedName().toString()); + + String call = prepare ? "prepareMarshalCacheObjects(e, ctx)" : "finishUnmarshalCacheObjects(e, ctx, ldr)"; + + code.add(identedLine("if (%s != null) {", accessor)); + + indent++; + + code.add(identedLine("for (%s e : %s)", elemSimple, accessor)); + + indent++; + + code.add(identedLine("%s.%s;", serRef, call)); + + indent -= 2; + + code.add(identedLine("}")); + } + + /** + * Returns the expression for a static instance of the nested message's generated serializer (e.g. + * {@code CacheInvokeDirectResultSerializer.INSTANCE}), registering the class to be emitted as a {@code private + * static final} field on the enclosing serializer. + */ + private String nestedSerializerRef(DeclaredType msgType) { + TypeElement el = (TypeElement)msgType.asElement(); + + String pkg = env.getElementUtils().getPackageOf(el).getQualifiedName().toString(); + String serSimple = el.getSimpleName() + "Serializer"; + String serFqn = pkg + "." + serSimple; + + String varName = camelToConstant(el.getSimpleName().toString()) + "_SER"; + + nestedSerializerFields.put(varName, serFqn); + + return varName; + } + + /** Converts {@code CacheInvokeDirectResult} to {@code CACHE_INVOKE_DIRECT_RESULT}. */ + private static String camelToConstant(String simple) { + StringBuilder sb = new StringBuilder(simple.length() + 8); + + for (int i = 0; i < simple.length(); i++) { + char c = simple.charAt(i); + + if (i > 0 && Character.isUpperCase(c) && !Character.isUpperCase(simple.charAt(i - 1))) + sb.append('_'); + + sb.append(Character.toUpperCase(c)); + } + + return sb.toString(); + } + + /** */ + private String fieldAccessor(VariableElement field) { + String name = field.getSimpleName().toString(); + + if (type.equals(field.getEnclosingElement())) + return "msg." + name; + + return "((" + field.getEnclosingElement().getSimpleName() + ")msg)." + name; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java index 053732880b6ec..17771024caa47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java @@ -293,8 +293,14 @@ public class DirectByteBufferStream { /** */ private Collection col; - /** */ - private Map map; + /** + * Staging buffer built on the NIO thread by {@link #readMap}. Deferred assembly — the real + * {@link java.util.HashMap} / {@link java.util.LinkedHashMap} is constructed on the user thread when the + * consumer first accesses the map (or explicitly via {@link PendingMap#materialize()}). This avoids calling + * {@code hashCode()} on keys whose hash is not yet stable (for example {@code KeyCacheObject} before + * {@code finishUnmarshal} has run). + */ + private PendingMap pendingMap; /** */ private long prim; @@ -1693,8 +1699,8 @@ public > C readCollection(MessageCollectionType type, Me } if (readSize >= 0) { - if (map == null) - map = type.linked() ? U.newLinkedHashMap(readSize) : U.newHashMap(readSize); + if (pendingMap == null) + pendingMap = new PendingMap<>(readSize, type.linked()); for (int i = readItems; i < readSize; i++) { if (!keyDone) { @@ -1712,7 +1718,7 @@ public > C readCollection(MessageCollectionType type, Me if (!lastFinished) return null; - map.put(mapCur, val); + pendingMap.addRaw(mapCur, val); keyDone = false; @@ -1724,9 +1730,9 @@ public > C readCollection(MessageCollectionType type, Me readItems = 0; mapCur = null; - M map0 = (M)map; + M map0 = (M)pendingMap; - map = null; + pendingMap = null; return map0; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/PendingMap.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/PendingMap.java new file mode 100644 index 0000000000000..dad677e0539f4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/PendingMap.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.direct.stream; + +import java.io.ObjectStreamException; +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * {@link Map} wrapper returned by {@link DirectByteBufferStream#readMap} while a message is being demarshaled + * on the NIO thread. + * + *

Instead of assembling a {@link java.util.HashMap} on the NIO thread — which is unsafe when the keys are + * {@link org.apache.ignite.internal.processors.cache.KeyCacheObject}s whose {@code hashCode()} is only stable + * once {@link org.apache.ignite.internal.processors.cache.CacheObject#finishUnmarshal} has run — this class + * accumulates {@code (key, value)} pairs into parallel arrays and defers construction of the real hash map + * until the first access. That first access happens on the user thread after {@code finishUnmarshal} has been + * applied to every {@code CacheObject} key/value, so bucket placement is correct. + * + *

Generated serializer code for an {@code @Order Map} field walks the pending contents via + * {@link #stagedKeys()} / {@link #stagedValues()} so the traversal does not trigger materialization + * prematurely. + */ +public final class PendingMap extends AbstractMap implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Minimum capacity of the staging arrays. */ + private static final int MIN_CAP = 4; + + /** Whether to produce a {@link java.util.LinkedHashMap} on materialization. */ + private final boolean linked; + + /** Staged keys; nulled out after {@link #materialize()}. */ + private Object[] keys; + + /** Staged values; nulled out after {@link #materialize()}. */ + private Object[] vals; + + /** Number of staged entries. */ + private int size; + + /** Materialized view; {@code null} until {@link #materialize()} runs. */ + private Map real; + + /** + * @param expSize Expected number of entries (hint for initial array capacity). + * @param linked Whether the materialized map should preserve insertion order. + */ + public PendingMap(int expSize, boolean linked) { + this.linked = linked; + + int cap = Math.max(expSize, MIN_CAP); + + keys = new Object[cap]; + vals = new Object[cap]; + } + + /** + * Appends a raw {@code (key, value)} pair to the staging arrays. Does not invoke {@code key.hashCode()} or + * {@code key.equals()}, so the method is safe to call on the NIO thread before any CacheObject key has had + * {@code finishUnmarshal} applied. + * + * @param k Key. + * @param v Value. + */ + public void addRaw(K k, V v) { + if (size == keys.length) { + int newCap = keys.length << 1; + + keys = Arrays.copyOf(keys, newCap); + vals = Arrays.copyOf(vals, newCap); + } + + keys[size] = k; + vals[size] = v; + + size++; + } + + /** + * Builds the real {@link java.util.HashMap} (or {@link java.util.LinkedHashMap}) from the staged entries. + * Idempotent. After the first call the staging arrays are discarded and all subsequent {@link Map} accesses + * hit the materialized instance. + * + * @return The materialized map. + */ + @SuppressWarnings("unchecked") + public Map materialize() { + if (real == null) { + Map m = linked ? U.newLinkedHashMap(size) : U.newHashMap(size); + + for (int i = 0; i < size; i++) + m.put((K)keys[i], (V)vals[i]); + + real = m; + keys = null; + vals = null; + } + + return real; + } + + /** + * Iterates staged keys without triggering {@link #materialize()}. Intended for consumption by generated + * {@code prepareMarshalCacheObjects} / {@code finishUnmarshalCacheObjects} code that needs to walk + * {@link org.apache.ignite.internal.processors.cache.CacheObject} key entries before the hash map is built. + * If the map has already been materialized, falls back to iterating the materialized key set. + * + * @return Iterable of staged keys. + */ + public Iterable stagedKeys() { + if (real != null) + return real.keySet(); + + return stagedIterable(keys, size); + } + + /** + * Iterates staged values without triggering {@link #materialize()}. If the map has already been materialized, + * falls back to iterating the materialized values collection. + * + * @return Iterable of staged values. + */ + public Iterable stagedValues() { + if (real != null) + return real.values(); + + return stagedIterable(vals, size); + } + + /** {@inheritDoc} */ + @Override public Set> entrySet() { + return materialize().entrySet(); + } + + /** {@inheritDoc} */ + @Override public int size() { + return real != null ? real.size() : size; + } + + /** {@inheritDoc} */ + @Override public boolean isEmpty() { + return size() == 0; + } + + /** {@inheritDoc} */ + @Override public V get(Object key) { + return materialize().get(key); + } + + /** {@inheritDoc} */ + @Override public V put(K key, V val) { + return materialize().put(key, val); + } + + /** {@inheritDoc} */ + @Override public V remove(Object key) { + return materialize().remove(key); + } + + /** {@inheritDoc} */ + @Override public void putAll(Map m) { + materialize().putAll(m); + } + + /** {@inheritDoc} */ + @Override public void clear() { + materialize().clear(); + } + + /** {@inheritDoc} */ + @Override public boolean containsKey(Object key) { + return materialize().containsKey(key); + } + + /** {@inheritDoc} */ + @Override public boolean containsValue(Object val) { + return materialize().containsValue(val); + } + + /** {@inheritDoc} */ + @Override public Set keySet() { + return materialize().keySet(); + } + + /** {@inheritDoc} */ + @Override public Collection values() { + return materialize().values(); + } + + /** + * Serializes as the underlying {@link java.util.HashMap} / {@link java.util.LinkedHashMap}, not as + * {@code PendingMap}. {@code PendingMap} is a wire-protocol staging detail — it should not appear in + * JDK-serialized forms (cache metadata, discovery data, etc.), where readers may not have the class on + * their classpath and where {@link Serializable} semantics are expected to yield a plain map. + * + * @return Materialized map. + * @throws ObjectStreamException Never. + */ + private Object writeReplace() throws ObjectStreamException { + return materialize(); + } + + /** + * Returns an {@link Iterable} over the keys of {@code m} without triggering materialization if {@code m} is a + * {@link PendingMap}. For any other {@link Map} implementation delegates to {@link Map#keySet()}. + * + *

Intended to be called from APT-generated {@code prepareMarshalCacheObjects} / {@code + * finishUnmarshalCacheObjects}. Generated code does not know at compile time whether the field holds a + * pending instance (receive-side) or a real map (send-side); this helper dispatches uniformly. + * + * @param m Map. + * @param Key type. + * @return Iterable over keys. + */ + @SuppressWarnings("unchecked") + public static Iterable keysOf(Map m) { + if (m instanceof PendingMap) + return ((PendingMap)m).stagedKeys(); + + return m.keySet(); + } + + /** + * Returns an {@link Iterable} over the values of {@code m} without triggering materialization if {@code m} is a + * {@link PendingMap}. For any other {@link Map} implementation delegates to {@link Map#values()}. + * + * @param m Map. + * @param Value type. + * @return Iterable over values. + */ + @SuppressWarnings("unchecked") + public static Iterable valuesOf(Map m) { + if (m instanceof PendingMap) + return ((PendingMap)m).stagedValues(); + + return m.values(); + } + + /** */ + @SuppressWarnings("unchecked") + private static Iterable stagedIterable(Object[] arr, int sz) { + if (sz == 0) + return Collections.emptyList(); + + return () -> new Iterator() { + private int i; + + @Override public boolean hasNext() { + return i < sz; + } + + @Override public E next() { + if (i >= sz) + throw new NoSuchElementException(); + + return (E)arr[i++]; + } + }; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java index 1276c6b63c82c..5996ac96d0856 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java @@ -19,7 +19,6 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.communication.ErrorMessage; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -111,19 +110,6 @@ public CacheObject result() { return ErrorMessage.error(errMsg); } - /** - * @param ctx Cache context. - * @throws IgniteCheckedException If failed. - */ - public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { - key.prepareMarshal(ctx.cacheObjectContext()); - - assert unprepareRes == null : "marshalResult() was not called for the result: " + this; - - if (res != null) - res.prepareMarshal(ctx.cacheObjectContext()); - } - /** * Converts the entry processor unprepared result to a cache object instance. * @@ -139,18 +125,6 @@ public void marshalResult(GridCacheContext ctx) { } } - /** - * @param ctx Cache context. - * @param ldr Class loader. - * @throws IgniteCheckedException If failed. - */ - public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { - key.finishUnmarshal(ctx.cacheObjectContext(), ldr); - - if (res != null) - res.finishUnmarshal(ctx.cacheObjectContext(), ldr); - } - /** {@inheritDoc} */ @Override public String toString() { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 8d0ffbb168f16..d68e81197848a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -91,6 +91,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -1150,6 +1151,8 @@ private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws I if (destNodeId == null || !cctx.localNodeId().equals(destNodeId)) { msg.prepareMarshal(cctx); + prepareMarshalGeneratedCacheObjects(msg); + if (msg instanceof GridCacheDeployable && msg.addDeploymentInfo()) cctx.deploy().prepare((GridCacheDeployable)msg); } @@ -1157,6 +1160,40 @@ private boolean onSend(GridCacheMessage msg, @Nullable UUID destNodeId) throws I return true; } + /** IGNITE-28520 Phase 1: invokes the generated per-field {@code CacheObject.prepareMarshal} on the user thread. */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private void prepareMarshalGeneratedCacheObjects(GridCacheMessage msg) throws IgniteCheckedException { + if (!(msg instanceof GridCacheIdMessage)) + return; + + CacheObjectContext cacheObjCtx = cctx.cacheObjectContext(((GridCacheIdMessage)msg).cacheId()); + + if (cacheObjCtx == null) + return; + + MessageSerializer ser = cctx.gridIO().messageFactory().serializer(msg.directType()); + + if (ser != null) + ser.prepareMarshalCacheObjects(msg, cacheObjCtx); + } + + /** Receive-side mirror of {@link #prepareMarshalGeneratedCacheObjects}. */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private void finishUnmarshalGeneratedCacheObjects(GridCacheMessage msg, ClassLoader ldr) throws IgniteCheckedException { + if (!(msg instanceof GridCacheIdMessage)) + return; + + CacheObjectContext cacheObjCtx = cctx.cacheObjectContext(((GridCacheIdMessage)msg).cacheId()); + + if (cacheObjCtx == null) + return; + + MessageSerializer ser = cctx.gridIO().messageFactory().serializer(msg.directType()); + + if (ser != null) + ser.finishUnmarshalCacheObjects(msg, cacheObjCtx, ldr); + } + /** * @param nodeId Node ID. * @param sndErr Send error. @@ -1604,7 +1641,11 @@ private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) { log.debug("Set P2P context [senderId=" + nodeId + ", msg=" + cacheMsg + ']'); } - cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader()); + ClassLoader ldr = cctx.deploy().globalLoader(); + + cacheMsg.finishUnmarshal(cctx, ldr); + + finishUnmarshalGeneratedCacheObjects(cacheMsg, ldr); } catch (IgniteCheckedException e) { cacheMsg.onClassError(e); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java index 7c78a10a5cf48..4afd8371a0746 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java @@ -38,6 +38,18 @@ * Return value for cases where both, value and success flag need to be returned. */ public class GridCacheReturn implements Message { + /** + * Delegate for {@code CacheObject} marshal/unmarshal traversal (generated by APT). + * + * FIXME IGNITE-28520 Phase 2: remove this field together with {@link #prepareMarshal(GridCacheContext)} and + * {@link #finishUnmarshal(GridCacheContext, ClassLoader)} once the communication framework invokes + * {@code MessageSerializer#prepareMarshalCacheObjects} / {@code MessageSerializer#finishUnmarshalCacheObjects} + * directly on the user thread (before NIO handoff on send, after demarshal on receive). Until then we delegate + * {@code CacheObject} traversal to the generated serializer to avoid duplicating logic, accepting a transient + * source → generated-source dependency. + */ + private static final GridCacheReturnSerializer SER = new GridCacheReturnSerializer(); + /** Value. */ @GridToStringInclude(sensitive = true) private volatile Object v; @@ -331,37 +343,34 @@ public void marshalResult(GridCacheContext ctx) { /** * @param ctx Cache context. * @throws IgniteCheckedException If failed. + * + * FIXME IGNITE-28520 Phase 2: drop this method once the communication layer calls + * {@link GridCacheReturnSerializer#prepareMarshalCacheObjects} automatically on the user thread. */ public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { assert !loc; - if (cacheObj != null) - cacheObj.prepareMarshal(ctx.cacheObjectContext()); - - if (invokeRes && invokeResCol != null) { - for (CacheInvokeDirectResult res : invokeResCol) - res.prepareMarshal(ctx); - } + SER.prepareMarshalCacheObjects(this, ctx.cacheObjectContext()); } /** * @param ctx Cache context. * @param ldr Class loader. * @throws IgniteCheckedException If failed. + * + * FIXME IGNITE-28520 Phase 2: drop the {@link #SER} delegation once the communication layer calls + * {@link GridCacheReturnSerializer#finishUnmarshalCacheObjects} automatically on the user thread. + * The post-traversal binary unwrapping below stays here (it is not part of the serializer contract). */ public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { loc = true; - if (cacheObj != null) { - cacheObj.finishUnmarshal(ctx.cacheObjectContext(), ldr); + SER.finishUnmarshalCacheObjects(this, ctx.cacheObjectContext(), ldr); + if (cacheObj != null) v = ctx.cacheObjectContext().unwrapBinaryIfNeeded(cacheObj, true, false, ldr); - } if (invokeRes && invokeResCol != null) { - for (CacheInvokeDirectResult res : invokeResCol) - res.finishUnmarshal(ctx, ldr); - Map map0 = U.newHashMap(invokeResCol.size()); for (CacheInvokeDirectResult res : invokeResCol) { diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageSerializer.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageSerializer.java index 90df0601693c3..732a4ef29e4d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageSerializer.java @@ -17,9 +17,10 @@ package org.apache.ignite.plugin.extensions.communication; -/** - * Interface for message serialization logic. - */ +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; + +/** Message serialization logic. */ public interface MessageSerializer { /** * Writes this message to provided byte buffer. @@ -38,4 +39,33 @@ public interface MessageSerializer { * @return Whether message was fully read. */ public boolean readFrom(M msg, MessageReader reader); + + /** + * Phase 1 of two-phase marshalling (IGNITE-28520): invoked on the user thread before the message is + * handed off to the NIO worker. The generated implementation walks {@code @Order}-annotated + * {@code CacheObject}/{@code KeyCacheObject} fields (direct, {@code Collection<>}, and array — single level, + * no maps, no recursion into nested messages) and calls {@code CacheObject.prepareMarshal} on each, so the NIO + * thread never does it. Default is a no-op. + * + * @param msg Message instance. + * @param ctx Cache object value context. + * @throws IgniteCheckedException If marshalling fails. + */ + public default void prepareMarshalCacheObjects(M msg, CacheObjectValueContext ctx) throws IgniteCheckedException { + // No-op by default. + } + + /** + * Receive-side mirror of {@link #prepareMarshalCacheObjects}: called on a user (listener-dispatch) thread to + * run {@code CacheObject.finishUnmarshal} for the same set of fields. Default is a no-op. + * + * @param msg Message instance. + * @param ctx Cache object value context. + * @param ldr Class loader used to resolve unmarshalled classes. + * @throws IgniteCheckedException If unmarshalling fails. + */ + public default void finishUnmarshalCacheObjects(M msg, CacheObjectValueContext ctx, ClassLoader ldr) + throws IgniteCheckedException { + // No-op by default. + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java index ee30d719dccc3..2a8433f985e43 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java @@ -305,6 +305,51 @@ public void testCompressedMessageExplicitUsageFails() { assertThat(compilation).hadErrorContaining(errMsg); } + /** + * Positive test for the safe replacement of an {@code @Order Map} field: + * a parent {@code Message} transmits a {@code Collection}, where {@code Entry} is a small + * {@code Message} holding {@code (KeyCacheObject key, Message val)}. The generator must emit recursion into + * every entry so that {@code KeyCacheObject#finishUnmarshal} runs on the user thread before application code + * reassembles a {@code HashMap} from the collection — the moment at which {@code KeyCacheObject.hashCode} + * becomes stable. + */ + @Test + public void testKeyCacheObjectInCollectionOfEntries() { + Compilation compilation = compile("KeyCacheObjectEntryMsg.java", "TestKeyCacheObjectCollectionMessage.java"); + + assertThat(compilation).succeeded(); + + assertEquals(2, compilation.generatedSourceFiles().size()); + + assertThat(compilation) + .generatedSourceFile("org.apache.ignite.internal.KeyCacheObjectEntryMsgSerializer") + .hasSourceEquivalentTo(javaFile("KeyCacheObjectEntryMsgSerializer.java")); + + assertThat(compilation) + .generatedSourceFile("org.apache.ignite.internal.TestKeyCacheObjectCollectionMessageSerializer") + .hasSourceEquivalentTo(javaFile("TestKeyCacheObjectCollectionMessageSerializer.java")); + } + + /** + * Positive test for {@code @Order Map}: the generator emits a standard + * {@code writeMap} / {@code readMap} pair and — crucially — a {@code finishUnmarshalCacheObjects} that walks + * keys via {@code PendingMap.keysOf} and values via {@code PendingMap.valuesOf}, calling + * {@code KeyCacheObject#finishUnmarshal} and the nested {@code GridCacheVersionSerializer} on each element. + * The staging {@link org.apache.ignite.internal.direct.stream.PendingMap} defers real {@code HashMap} + * assembly to the user thread, so by the time the consumer first queries the map every key's + * {@code hashCode} is already stable. + */ + @Test + public void testMapWithKeyCacheObjectAndMessageValue() { + Compilation compilation = compile("TestMapKeyCacheObjectMessage.java"); + + assertThat(compilation).succeeded(); + + assertThat(compilation) + .generatedSourceFile("org.apache.ignite.internal.TestMapKeyCacheObjectMessageSerializer") + .hasSourceEquivalentTo(javaFile("TestMapKeyCacheObjectMessageSerializer.java")); + } + /** * Negative test that verifies the compilation failed if the Compress annotation is used for unsupported types. */ diff --git a/modules/core/src/test/resources/codegen/KeyCacheObjectEntryMsg.java b/modules/core/src/test/resources/codegen/KeyCacheObjectEntryMsg.java new file mode 100644 index 0000000000000..707801a027cd4 --- /dev/null +++ b/modules/core/src/test/resources/codegen/KeyCacheObjectEntryMsg.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Test entry Message used by {@link TestKeyCacheObjectCollectionMessage} to demonstrate the safe substitute for + * an {@code @Order Map} field. Each instance represents one logical {@code (key, value)} pair; + * the parent Message transmits them as a {@code Collection}. + */ +public class KeyCacheObjectEntryMsg implements Message { + @Order(0) + KeyCacheObject key; + + @Order(1) + GridCacheVersion val; + + public short directType() { + return 0; + } +} diff --git a/modules/core/src/test/resources/codegen/KeyCacheObjectEntryMsgSerializer.java b/modules/core/src/test/resources/codegen/KeyCacheObjectEntryMsgSerializer.java new file mode 100644 index 0000000000000..8c2a90a113596 --- /dev/null +++ b/modules/core/src/test/resources/codegen/KeyCacheObjectEntryMsgSerializer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.KeyCacheObjectEntryMsg; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionSerializer; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * This class is generated automatically. + * + * @see org.apache.ignite.internal.MessageProcessor + */ +public class KeyCacheObjectEntryMsgSerializer implements MessageSerializer { + /** */ + private final static GridCacheVersionSerializer GRID_CACHE_VERSION_SER = new GridCacheVersionSerializer(); + + /** */ + @Override public boolean writeTo(KeyCacheObjectEntryMsg msg, MessageWriter writer) { + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(msg.directType())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeKeyCacheObject(msg.key)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage(msg.val)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** */ + @Override public boolean readFrom(KeyCacheObjectEntryMsg msg, MessageReader reader) { + switch (reader.state()) { + case 0: + msg.key = reader.readKeyCacheObject(); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + msg.val = reader.readMessage(); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return true; + } + + /** */ + @Override public void prepareMarshalCacheObjects(KeyCacheObjectEntryMsg msg, CacheObjectValueContext ctx) throws IgniteCheckedException { + if (msg.key != null) + msg.key.prepareMarshal(ctx); + if (msg.val != null) + GRID_CACHE_VERSION_SER.prepareMarshalCacheObjects(msg.val, ctx); + } + + /** */ + @Override public void finishUnmarshalCacheObjects(KeyCacheObjectEntryMsg msg, CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { + if (msg.key != null) + msg.key.finishUnmarshal(ctx, ldr); + if (msg.val != null) + GRID_CACHE_VERSION_SER.finishUnmarshalCacheObjects(msg.val, ctx, ldr); + } +} diff --git a/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java b/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java index 329f334337488..bca7dd7cba630 100644 --- a/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java +++ b/modules/core/src/test/resources/codegen/TestCollectionsMessageSerializer.java @@ -17,7 +17,11 @@ package org.apache.ignite.internal; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.TestCollectionsMessage; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionSerializer; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; import org.apache.ignite.plugin.extensions.communication.MessageItemType; @@ -31,6 +35,8 @@ * @see org.apache.ignite.internal.MessageProcessor */ public class TestCollectionsMessageSerializer implements MessageSerializer { + /** */ + private final static GridCacheVersionSerializer GRID_CACHE_VERSION_SER = new GridCacheVersionSerializer(); /** */ private final static MessageCollectionType affTopVersionListCollDesc = new MessageCollectionType(new MessageItemType(MessageCollectionItemType.AFFINITY_TOPOLOGY_VERSION), false); /** */ @@ -452,4 +458,20 @@ public class TestCollectionsMessageSerializer implements MessageSerializer} field. + *

+ * Instead of a {@code Map}, the wire representation is a {@code Collection}. The + * APT-generated serializer does not assemble any {@code HashMap} on the NIO thread, so the fact that + * {@code KeyCacheObject.hashCode} is unstable until {@link org.apache.ignite.internal.processors.cache.CacheObject#finishUnmarshal} + * has run is harmless. The generated {@code finishUnmarshalCacheObjects} walks every entry and calls + * {@code KeyCacheObject#finishUnmarshal} on the user thread — so by the time the application code reassembles a + * {@code HashMap} from {@link #entries}, every key's hashCode is already stable. + */ +public class TestKeyCacheObjectCollectionMessage implements Message { + @Order(0) + Collection entries; + + public short directType() { + return 1; + } +} diff --git a/modules/core/src/test/resources/codegen/TestKeyCacheObjectCollectionMessageSerializer.java b/modules/core/src/test/resources/codegen/TestKeyCacheObjectCollectionMessageSerializer.java new file mode 100644 index 0000000000000..d00ca0cff175a --- /dev/null +++ b/modules/core/src/test/resources/codegen/TestKeyCacheObjectCollectionMessageSerializer.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.KeyCacheObjectEntryMsg; +import org.apache.ignite.internal.KeyCacheObjectEntryMsgSerializer; +import org.apache.ignite.internal.TestKeyCacheObjectCollectionMessage; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; +import org.apache.ignite.plugin.extensions.communication.MessageItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * This class is generated automatically. + * + * @see org.apache.ignite.internal.MessageProcessor + */ +public class TestKeyCacheObjectCollectionMessageSerializer implements MessageSerializer { + /** */ + private final static KeyCacheObjectEntryMsgSerializer KEY_CACHE_OBJECT_ENTRY_MSG_SER = new KeyCacheObjectEntryMsgSerializer(); + /** */ + private final static MessageCollectionType entriesCollDesc = new MessageCollectionType(new MessageItemType(MessageCollectionItemType.MSG), false); + + /** */ + @Override public boolean writeTo(TestKeyCacheObjectCollectionMessage msg, MessageWriter writer) { + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(msg.directType())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeCollection(msg.entries, entriesCollDesc)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** */ + @Override public boolean readFrom(TestKeyCacheObjectCollectionMessage msg, MessageReader reader) { + switch (reader.state()) { + case 0: + msg.entries = reader.readCollection(entriesCollDesc); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return true; + } + + /** */ + @Override public void prepareMarshalCacheObjects(TestKeyCacheObjectCollectionMessage msg, CacheObjectValueContext ctx) throws IgniteCheckedException { + if (msg.entries != null) { + for (KeyCacheObjectEntryMsg e : msg.entries) + KEY_CACHE_OBJECT_ENTRY_MSG_SER.prepareMarshalCacheObjects(e, ctx); + } + } + + /** */ + @Override public void finishUnmarshalCacheObjects(TestKeyCacheObjectCollectionMessage msg, CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { + if (msg.entries != null) { + for (KeyCacheObjectEntryMsg e : msg.entries) + KEY_CACHE_OBJECT_ENTRY_MSG_SER.finishUnmarshalCacheObjects(e, ctx, ldr); + } + } +} diff --git a/modules/core/src/test/resources/codegen/TestMapKeyCacheObjectMessage.java b/modules/core/src/test/resources/codegen/TestMapKeyCacheObjectMessage.java new file mode 100644 index 0000000000000..882d77300faf8 --- /dev/null +++ b/modules/core/src/test/resources/codegen/TestMapKeyCacheObjectMessage.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.Map; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Test Message demonstrating that {@code @Order Map} is safe to use as-is. + *

+ * The APT-generated {@code readFrom} leaves {@code entries} as a + * {@link org.apache.ignite.internal.direct.stream.PendingMap} on the NIO thread (no {@code hashCode} + * invocations on keys). The generated {@code finishUnmarshalCacheObjects} walks the staged keys and values + * via {@code PendingMap.keysOf} / {@code PendingMap.valuesOf} and calls {@code KeyCacheObject#finishUnmarshal} + * / the nested {@code GridCacheVersionSerializer#finishUnmarshalCacheObjects} on each — so by the time user + * code accesses the map and triggers real {@code HashMap} assembly, every key's {@code hashCode} is stable. + */ +public class TestMapKeyCacheObjectMessage implements Message { + @Order(0) + Map entries; + + public short directType() { + return 0; + } +} diff --git a/modules/core/src/test/resources/codegen/TestMapKeyCacheObjectMessageSerializer.java b/modules/core/src/test/resources/codegen/TestMapKeyCacheObjectMessageSerializer.java new file mode 100644 index 0000000000000..70dfb824f6db8 --- /dev/null +++ b/modules/core/src/test/resources/codegen/TestMapKeyCacheObjectMessageSerializer.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.TestMapKeyCacheObjectMessage; +import org.apache.ignite.internal.direct.stream.PendingMap; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionSerializer; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageItemType; +import org.apache.ignite.plugin.extensions.communication.MessageMapType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * This class is generated automatically. + * + * @see org.apache.ignite.internal.MessageProcessor + */ +public class TestMapKeyCacheObjectMessageSerializer implements MessageSerializer { + /** */ + private final static GridCacheVersionSerializer GRID_CACHE_VERSION_SER = new GridCacheVersionSerializer(); + /** */ + private static final MessageMapType entriesCollDesc = new MessageMapType(new MessageItemType(MessageCollectionItemType.KEY_CACHE_OBJECT), new MessageItemType(MessageCollectionItemType.MSG), false); + + /** */ + @Override public boolean writeTo(TestMapKeyCacheObjectMessage msg, MessageWriter writer) { + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(msg.directType())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMap(msg.entries, entriesCollDesc)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** */ + @Override public boolean readFrom(TestMapKeyCacheObjectMessage msg, MessageReader reader) { + switch (reader.state()) { + case 0: + msg.entries = reader.readMap(entriesCollDesc); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return true; + } + + /** */ + @Override public void prepareMarshalCacheObjects(TestMapKeyCacheObjectMessage msg, CacheObjectValueContext ctx) throws IgniteCheckedException { + if (msg.entries != null) { + for (KeyCacheObject k : PendingMap.keysOf(msg.entries)) + k.prepareMarshal(ctx); + for (GridCacheVersion v : PendingMap.valuesOf(msg.entries)) + GRID_CACHE_VERSION_SER.prepareMarshalCacheObjects(v, ctx); + } + } + + /** */ + @Override public void finishUnmarshalCacheObjects(TestMapKeyCacheObjectMessage msg, CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { + if (msg.entries != null) { + for (KeyCacheObject k : PendingMap.keysOf(msg.entries)) + k.finishUnmarshal(ctx, ldr); + for (GridCacheVersion v : PendingMap.valuesOf(msg.entries)) + GRID_CACHE_VERSION_SER.finishUnmarshalCacheObjects(v, ctx, ldr); + } + } +} diff --git a/modules/core/src/test/resources/codegen/TestMapMessageSerializer.java b/modules/core/src/test/resources/codegen/TestMapMessageSerializer.java index b2e66053653df..54f7d5c6f0853 100644 --- a/modules/core/src/test/resources/codegen/TestMapMessageSerializer.java +++ b/modules/core/src/test/resources/codegen/TestMapMessageSerializer.java @@ -17,7 +17,12 @@ package org.apache.ignite.internal; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.TestMapMessage; +import org.apache.ignite.internal.direct.stream.PendingMap; +import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersionSerializer; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageCollectionType; import org.apache.ignite.plugin.extensions.communication.MessageItemType; @@ -32,6 +37,8 @@ * @see org.apache.ignite.internal.MessageProcessor */ public class TestMapMessageSerializer implements MessageSerializer { + /** */ + private final static GridCacheVersionSerializer GRID_CACHE_VERSION_SER = new GridCacheVersionSerializer(); /** */ private final static MessageMapType affTopVersionIgniteUuidMapCollDesc = new MessageMapType(new MessageItemType(MessageCollectionItemType.AFFINITY_TOPOLOGY_VERSION), new MessageItemType(MessageCollectionItemType.IGNITE_UUID), false); /** */ @@ -453,4 +460,20 @@ public class TestMapMessageSerializer implements MessageSerializer { + /** */ + private final static GridCacheVersionSerializer GRID_CACHE_VERSION_SER = new GridCacheVersionSerializer(); /** */ private final static MessageArrayType intMatrixCollDesc = new MessageArrayType(new MessageItemType(MessageCollectionItemType.INT_ARR), int[].class); /** */ @@ -269,4 +274,32 @@ public class TestMessageSerializer implements MessageSerializer { return true; } + + /** */ + @Override public void prepareMarshalCacheObjects(TestMessage msg, CacheObjectValueContext ctx) throws IgniteCheckedException { + if (msg.ver != null) + GRID_CACHE_VERSION_SER.prepareMarshalCacheObjects(msg.ver, ctx); + if (msg.verArr != null) { + for (GridCacheVersion e : msg.verArr) + GRID_CACHE_VERSION_SER.prepareMarshalCacheObjects(e, ctx); + } + if (msg.keyCacheObject != null) + msg.keyCacheObject.prepareMarshal(ctx); + if (msg.cacheObject != null) + msg.cacheObject.prepareMarshal(ctx); + } + + /** */ + @Override public void finishUnmarshalCacheObjects(TestMessage msg, CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException { + if (msg.ver != null) + GRID_CACHE_VERSION_SER.finishUnmarshalCacheObjects(msg.ver, ctx, ldr); + if (msg.verArr != null) { + for (GridCacheVersion e : msg.verArr) + GRID_CACHE_VERSION_SER.finishUnmarshalCacheObjects(e, ctx, ldr); + } + if (msg.keyCacheObject != null) + msg.keyCacheObject.finishUnmarshal(ctx, ldr); + if (msg.cacheObject != null) + msg.cacheObject.finishUnmarshal(ctx, ldr); + } } \ No newline at end of file