Skip to content

Commit bcf6f02

Browse files
authored
IGNITE-28221 : combining of the message factories v2 (#13003)
1 parent dd1b439 commit bcf6f02

File tree

90 files changed

+967
-1390
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+967
-1390
lines changed

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/message/CalciteCommunicationMessageSerializationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.message;
1919

20-
import org.apache.ignite.internal.managers.communication.AbstractMessageSerializationTest;
20+
import org.apache.ignite.internal.managers.AbstractMessageSerializationTest;
2121
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
2222

2323
/** */

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 696 additions & 0 deletions
Large diffs are not rendered by default.

modules/core/src/main/java/org/apache/ignite/internal/ExchangeInfo.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.util.List;
2121
import java.util.Objects;
22-
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
2322
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
2423
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
2524
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -31,7 +30,7 @@ public final class ExchangeInfo extends IgniteDiagnosticRequest.DiagnosticBaseIn
3130
AffinityTopologyVersion topVer;
3231

3332
/**
34-
* Empty constructor required by {@link GridIoMessageFactory}.
33+
* Empty constructor required by {@link CoreMessagesProvider}.
3534
*/
3635
public ExchangeInfo() {
3736
// No-op.

modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.apache.ignite.maintenance.MaintenanceRegistry;
8585
import org.apache.ignite.plugin.PluginNotFoundException;
8686
import org.apache.ignite.plugin.PluginProvider;
87+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
8788

8889
/**
8990
*
@@ -209,6 +210,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
209210
*/
210211
public MaintenanceRegistry maintenanceRegistry();
211212

213+
/**
214+
* Gets core message factoy.
215+
*
216+
* @return Core message factory.
217+
*/
218+
public MessageFactory messageFactory();
219+
212220
/**
213221
* Gets transformation processor.
214222
*

modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
import org.apache.ignite.maintenance.MaintenanceRegistry;
115115
import org.apache.ignite.plugin.PluginNotFoundException;
116116
import org.apache.ignite.plugin.PluginProvider;
117+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
117118
import org.jetbrains.annotations.Nullable;
118119

119120
import static org.apache.ignite.internal.IgniteComponentType.SPRING;
@@ -371,17 +372,14 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
371372
private Thread.UncaughtExceptionHandler hnd;
372373

373374
/** */
374-
private IgniteEx grid;
375+
private IgniteKernal grid;
375376

376377
/** */
377378
private IgniteConfiguration cfg;
378379

379380
/** */
380381
private GridKernalGateway gw;
381382

382-
/** Network segmented flag. */
383-
private volatile boolean segFlag;
384-
385383
/** Performance suggestions. */
386384
private final GridPerformanceSuggestions perf = new GridPerformanceSuggestions();
387385

@@ -430,7 +428,7 @@ public GridKernalContextImpl() {
430428
@SuppressWarnings("TypeMayBeWeakened")
431429
protected GridKernalContextImpl(
432430
GridLoggerProxy log,
433-
IgniteEx grid,
431+
IgniteKernal grid,
434432
IgniteConfiguration cfg,
435433
GridKernalGateway gw,
436434
List<PluginProvider> plugins,
@@ -614,18 +612,9 @@ else if (!(comp instanceof DiscoveryNodeValidationProcessor
614612
comps.add(comp);
615613
}
616614

617-
/**
618-
* @param helper Helper to add.
619-
*/
620-
public void addHelper(Object helper) {
621-
assert helper != null;
622-
623-
assert false : "Unknown helper class: " + helper.getClass();
624-
}
625-
626615
/** {@inheritDoc} */
627616
@Override public boolean isStopping() {
628-
return ((IgniteKernal)grid).isStopping();
617+
return grid.isStopping();
629618
}
630619

631620
/** */
@@ -703,6 +692,11 @@ public void addHelper(Object helper) {
703692
return maintenanceProc;
704693
}
705694

695+
/** {@inheritDoc} */
696+
@Override public MessageFactory messageFactory() {
697+
return grid.messageFactory();
698+
}
699+
706700
/** {@inheritDoc} */
707701
@Override public CacheObjectTransformerProcessor transformer() {
708702
return transProc;

modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticRequest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Map;
2525
import java.util.Set;
2626
import java.util.UUID;
27-
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
2827
import org.apache.ignite.internal.util.typedef.internal.S;
2928
import org.apache.ignite.plugin.extensions.communication.Message;
3029
import org.jetbrains.annotations.Nullable;
@@ -49,7 +48,7 @@ public class IgniteDiagnosticRequest implements Message {
4948
private final Map<Object, List<String>> msgs = new LinkedHashMap<>();
5049

5150
/**
52-
* Default constructor required by {@link GridIoMessageFactory}.
51+
* Default constructor required by {@link CoreMessagesProvider}.
5352
*/
5453
public IgniteDiagnosticRequest() {
5554
// No-op.

modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticResponse.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.ignite.internal;
1919

20-
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
2120
import org.apache.ignite.internal.util.typedef.internal.S;
2221
import org.apache.ignite.plugin.extensions.communication.Message;
2322
import org.jetbrains.annotations.Nullable;
@@ -33,7 +32,7 @@ public class IgniteDiagnosticResponse implements Message {
3332
@Nullable String respInfo;
3433

3534
/**
36-
* Default constructor required by {@link GridIoMessageFactory}.
35+
* Default constructor required by {@link CoreMessagesProvider}.
3736
*/
3837
public IgniteDiagnosticResponse() {
3938
// No-op.

modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
100100
import org.apache.ignite.internal.managers.collision.GridCollisionManager;
101101
import org.apache.ignite.internal.managers.communication.GridIoManager;
102+
import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
102103
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
103104
import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData;
104105
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -209,6 +210,8 @@
209210
import org.apache.ignite.plugin.IgnitePlugin;
210211
import org.apache.ignite.plugin.PluginNotFoundException;
211212
import org.apache.ignite.plugin.PluginProvider;
213+
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
214+
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
212215
import org.apache.ignite.spi.IgniteSpi;
213216
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
214217
import org.apache.ignite.spi.discovery.isolated.IsolatedDiscoverySpi;
@@ -436,6 +439,9 @@ public class IgniteKernal implements IgniteEx, Externalizable {
436439
/** The state object is used when reconnection occurs. See {@link IgniteKernal#onReconnected(boolean)}. */
437440
private final ReconnectState reconnectState = new ReconnectState();
438441

442+
/** Core message factory. */
443+
private MessageFactory msgFactory;
444+
439445
/**
440446
* No-arg constructor is required by externalization.
441447
*/
@@ -999,7 +1005,11 @@ public void start(
9991005
}
10001006
startManager(new GridMetricManager(ctx));
10011007
startManager(new GridSystemViewManager(ctx));
1008+
1009+
initMessageFactory();
1010+
10021011
startManager(new GridIoManager(ctx));
1012+
10031013
startManager(new GridCheckpointManager(ctx));
10041014

10051015
startManager(new GridEventStorageManager(ctx));
@@ -1301,6 +1311,31 @@ else if (e instanceof IgniteCheckedException)
13011311
startTimer.finishGlobalStage("Await exchange");
13021312
}
13031313

1314+
/** */
1315+
private void initMessageFactory() throws IgniteCheckedException {
1316+
MessageFactoryProvider[] msgs = ctx.plugins().extensions(MessageFactoryProvider.class);
1317+
1318+
if (msgs == null)
1319+
msgs = new MessageFactoryProvider[0];
1320+
1321+
List<MessageFactoryProvider> compMsgs = new ArrayList<>();
1322+
1323+
compMsgs.add(new CoreMessagesProvider(ctx.marshaller(), ctx.marshallerContext().jdkMarshaller(),
1324+
U.resolveClassLoader(ctx.config())));
1325+
1326+
for (IgniteComponentType compType : IgniteComponentType.values()) {
1327+
MessageFactoryProvider f = compType.messageFactory();
1328+
1329+
if (f != null)
1330+
compMsgs.add(f);
1331+
}
1332+
1333+
if (!compMsgs.isEmpty())
1334+
msgs = F.concat(msgs, compMsgs.toArray(new MessageFactoryProvider[compMsgs.size()]));
1335+
1336+
msgFactory = new IgniteMessageFactoryImpl(msgs);
1337+
}
1338+
13041339
/**
13051340
* @return Ignite security processor. See {@link IgniteSecurity} for details.
13061341
*/
@@ -3028,6 +3063,11 @@ private void checkClusterState() throws IgniteException {
30283063
}
30293064
}
30303065

3066+
/** @return Core message factory. */
3067+
MessageFactory messageFactory() {
3068+
return msgFactory;
3069+
}
3070+
30313071
/**
30323072
* Method is responsible for handling the {@link EventType#EVT_CLIENT_NODE_DISCONNECTED} event. Notify all the
30333073
* GridComponents that the such even has been occurred (e.g. if the local client node disconnected from the cluster

modules/core/src/main/java/org/apache/ignite/internal/TxEntriesInfo.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.HashSet;
2222
import java.util.Objects;
2323
import org.apache.ignite.IgniteCheckedException;
24-
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
2524
import org.apache.ignite.internal.processors.cache.GridCacheContext;
2625
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
2726
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -38,7 +37,7 @@ public final class TxEntriesInfo extends IgniteDiagnosticRequest.DiagnosticBaseI
3837
Collection<KeyCacheObject> keys;
3938

4039
/**
41-
* Empty constructor required by {@link GridIoMessageFactory}.
40+
* Empty constructor required by {@link CoreMessagesProvider}.
4241
*/
4342
public TxEntriesInfo() {
4443
// No-op.

modules/core/src/main/java/org/apache/ignite/internal/TxInfo.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.ignite.internal;
1919

2020
import java.util.Objects;
21-
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
2221
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
2322
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
2423
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -34,7 +33,7 @@ public final class TxInfo extends IgniteDiagnosticRequest.DiagnosticBaseInfo {
3433
GridCacheVersion nearVer;
3534

3635
/**
37-
* Empty constructor required by {@link GridIoMessageFactory}.
36+
* Empty constructor required by {@link CoreMessagesProvider}.
3837
*/
3938
public TxInfo() {
4039
// No-op.

0 commit comments

Comments
 (0)