Skip to content

Commit f04eafc

Browse files
authored
[ISSUE #10266] Fix OOM caused by OpenTelemetry 1.44 OtlpGrpcMetricExporter pool race (#10267)
OpenTelemetry Java 1.44.0 ~ 1.46.x ships OtlpGrpcMetricExporter with MemoryMode.REUSABLE_DATA by default. The underlying MetricReusableDataMarshaler.marshalerPool is a non-thread-safe ArrayDeque accessed concurrently by the reader thread (poll) and the OkHttp callback thread (add, via whenComplete). With BatchSplittingMetricExporter issuing N concurrent sub-batch exports per cycle, the pool races and leaks marshalers (~132 KiB each) until OOM. Fixed upstream in 1.47.0 via open-telemetry/opentelemetry-java#7041 (ArrayDeque -> ConcurrentLinkedDeque). - Bump OpenTelemetry to 1.47.0 in pom.xml so the upstream race fix is in effect. - Default OtlpGrpcMetricExporter to MemoryMode.IMMUTABLE_DATA to preserve the pre-1.44 default behavior; exposed via brokerConfig.metricsExportOtelMemoryMode ("IMMUTABLE_DATA" / "REUSABLE_DATA", case-insensitive). Operators may opt in to REUSABLE_DATA when running on OTel >= 1.47. - Cap concurrent in-flight sub-batches in BatchSplittingMetricExporter with a Semaphore controlled by brokerConfig.metricsExportBatchMaxConcurrent (default 4; set to 1 to serialize and match pre-batch behavior; 0 or Integer.MAX_VALUE means unlimited). - Add brokerConfig.metricsExportBatchSplitEnabled (default true) as an escape hatch to bypass BatchSplittingMetricExporter entirely, restoring the raw OtlpGrpcMetricExporter wiring. - Defensively snapshot MetricData points before export to avoid ArrayIndexOutOfBoundsException in NumberDataPointMarshaler when async instrument callbacks mutate point collections during export.
1 parent 94fbfcf commit f04eafc

File tree

6 files changed

+327
-36
lines changed

6 files changed

+327
-36
lines changed

WORKSPACE

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,15 @@ maven_install(
8989
"io.grpc:grpc-api:1.47.0",
9090
"io.grpc:grpc-testing:1.47.0",
9191
"org.springframework:spring-core:5.3.26",
92-
"io.opentelemetry:opentelemetry-exporter-otlp:1.29.0",
93-
"io.opentelemetry:opentelemetry-exporter-prometheus:1.29.0-alpha",
94-
"io.opentelemetry:opentelemetry-exporter-logging:1.29.0",
95-
"io.opentelemetry:opentelemetry-sdk:1.29.0",
96-
"io.opentelemetry:opentelemetry-exporter-logging-otlp:1.29.0",
92+
"io.opentelemetry:opentelemetry-exporter-otlp:1.47.0",
93+
"io.opentelemetry:opentelemetry-exporter-prometheus:1.47.0-alpha",
94+
"io.opentelemetry:opentelemetry-exporter-logging:1.47.0",
95+
"io.opentelemetry:opentelemetry-sdk:1.47.0",
96+
"io.opentelemetry:opentelemetry-exporter-logging-otlp:1.47.0",
9797
"com.squareup.okio:okio-jvm:3.0.0",
98-
"io.opentelemetry:opentelemetry-api:1.29.0",
99-
"io.opentelemetry:opentelemetry-sdk-metrics:1.29.0",
100-
"io.opentelemetry:opentelemetry-sdk-common:1.29.0",
98+
"io.opentelemetry:opentelemetry-api:1.47.0",
99+
"io.opentelemetry:opentelemetry-sdk-metrics:1.47.0",
100+
"io.opentelemetry:opentelemetry-sdk-common:1.47.0",
101101
"io.github.aliyunmq:rocketmq-slf4j-api:1.0.0",
102102
"io.github.aliyunmq:rocketmq-logback-classic:1.0.0",
103103
"org.slf4j:jul-to-slf4j:2.0.6",

broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.ArrayList;
4747
import java.util.Collection;
4848
import java.util.List;
49+
import java.util.concurrent.Semaphore;
4950
import java.util.function.IntSupplier;
5051

5152
/**
@@ -70,6 +71,16 @@
7071
* multiple smaller MetricData objects, each preserving the
7172
* original resource, scope, name, description, unit, and
7273
* type metadata.
74+
*
75+
* <p>Sub-batch submissions are bounded by a per-call
76+
* semaphore whose permit count comes from
77+
* {@code maxConcurrentSupplier}. This caps the number of
78+
* batches that can be in-flight to the delegate at once,
79+
* preventing the MetricData retention window from being
80+
* multiplied by the batch count under OTel SDK 1.31+ where
81+
* the OTLP exporter may hold references until the RPC
82+
* completes. A value &lt;= 0 or Integer.MAX_VALUE disables
83+
* the limit (legacy behavior).
7384
*/
7485
public final class BatchSplittingMetricExporter
7586
implements MetricExporter {
@@ -85,16 +96,24 @@ public final class BatchSplittingMetricExporter
8596
/** Supplies the max data points per batch at runtime. */
8697
private final IntSupplier maxBatchSizeSupplier;
8798

99+
/** Supplies the max concurrent in-flight batches. */
100+
private final IntSupplier maxConcurrentSupplier;
101+
88102
/**
89103
* Creates a new BatchSplittingMetricExporter.
90104
*
91105
* @param metricExporter the underlying MetricExporter
92106
* @param batchSizeSupplier supplies the max number
93107
* of data points per batch; must return &gt; 0
108+
* @param concurrencySupplier supplies the max number
109+
* of sub-batches that can be in-flight to the
110+
* delegate at once; &lt;= 0 or Integer.MAX_VALUE
111+
* means unlimited
94112
*/
95113
public BatchSplittingMetricExporter(
96114
final MetricExporter metricExporter,
97-
final IntSupplier batchSizeSupplier) {
115+
final IntSupplier batchSizeSupplier,
116+
final IntSupplier concurrencySupplier) {
98117
if (metricExporter == null) {
99118
throw new NullPointerException(
100119
"metricExporter must not be null");
@@ -103,8 +122,13 @@ public BatchSplittingMetricExporter(
103122
throw new NullPointerException(
104123
"batchSizeSupplier must not be null");
105124
}
125+
if (concurrencySupplier == null) {
126+
throw new NullPointerException(
127+
"concurrencySupplier must not be null");
128+
}
106129
this.delegate = metricExporter;
107130
this.maxBatchSizeSupplier = batchSizeSupplier;
131+
this.maxConcurrentSupplier = concurrencySupplier;
108132
}
109133

110134
/** {@inheritDoc} */
@@ -136,23 +160,48 @@ public CompletableResultCode export(
136160
List<List<MetricData>> batches =
137161
splitIntoBatches(snapshotted, maxBatchSize);
138162

163+
int maxConcurrent =
164+
maxConcurrentSupplier.getAsInt();
165+
final Semaphore semaphore =
166+
(maxConcurrent > 0
167+
&& maxConcurrent < Integer.MAX_VALUE)
168+
? new Semaphore(maxConcurrent)
169+
: null;
170+
139171
LOGGER.debug(
140172
"Splitting metrics export: "
141173
+ "totalDataPoints={}, "
142174
+ "maxBatchSize={}, "
143-
+ "batchCount={}",
175+
+ "batchCount={}, "
176+
+ "maxConcurrent={}",
144177
totalDataPoints, maxBatchSize,
145-
batches.size());
178+
batches.size(), maxConcurrent);
146179

147180
List<CompletableResultCode> results =
148181
new ArrayList<>(batches.size());
149182
for (int i = 0; i < batches.size(); i++) {
150183
final List<MetricData> batch =
151184
batches.get(i);
152185
final int batchIndex = i;
186+
if (semaphore != null) {
187+
try {
188+
semaphore.acquire();
189+
} catch (InterruptedException e) {
190+
Thread.currentThread().interrupt();
191+
LOGGER.warn(
192+
"Interrupted while waiting "
193+
+ "for export slot; "
194+
+ "submitted {} of {} batches",
195+
i, batches.size());
196+
break;
197+
}
198+
}
153199
CompletableResultCode r =
154200
delegate.export(batch);
155201
r.whenComplete(() -> {
202+
if (semaphore != null) {
203+
semaphore.release();
204+
}
156205
if (!r.isSuccess()) {
157206
logFailedBatch(batchIndex, batch);
158207
}

broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
2929
import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
3030
import io.opentelemetry.sdk.OpenTelemetrySdk;
31+
import io.opentelemetry.sdk.common.export.MemoryMode;
3132
import io.opentelemetry.sdk.metrics.Aggregation;
3233
import io.opentelemetry.sdk.metrics.InstrumentSelector;
3334
import io.opentelemetry.sdk.metrics.InstrumentType;
@@ -69,6 +70,7 @@
6970
import java.util.Arrays;
7071
import java.util.HashMap;
7172
import java.util.List;
73+
import java.util.Locale;
7274
import java.util.Map;
7375
import java.util.concurrent.TimeUnit;
7476
import java.util.function.Supplier;
@@ -318,6 +320,19 @@ private boolean checkConfig() {
318320
return false;
319321
}
320322

323+
private static MemoryMode resolveMemoryMode(String configured) {
324+
if (StringUtils.isBlank(configured)) {
325+
return MemoryMode.IMMUTABLE_DATA;
326+
}
327+
try {
328+
return MemoryMode.valueOf(configured.trim().toUpperCase(Locale.ROOT));
329+
} catch (IllegalArgumentException e) {
330+
LOGGER.warn("Invalid metricsExportOtelMemoryMode '{}', falling back to IMMUTABLE_DATA. Valid values: IMMUTABLE_DATA, REUSABLE_DATA.",
331+
configured);
332+
return MemoryMode.IMMUTABLE_DATA;
333+
}
334+
}
335+
321336
private void init() {
322337
MetricsExporterType metricsExporterType = brokerConfig.getMetricsExporterType();
323338
if (metricsExporterType == MetricsExporterType.DISABLE) {
@@ -356,9 +371,16 @@ private void init() {
356371
if (!endpoint.startsWith("http")) {
357372
endpoint = "https://" + endpoint;
358373
}
374+
// OTel 1.44.0 ~ 1.46.x defaults OtlpGrpcMetricExporter to REUSABLE_DATA,
375+
// whose MetricReusableDataMarshaler uses a non-thread-safe ArrayDeque pool.
376+
// Combined with BatchSplittingMetricExporter's concurrent sub-batch export
377+
// this triggers a pool race that leaks marshalers until OOM (fixed upstream
378+
// in 1.47.0 via opentelemetry-java#7041). IMMUTABLE_DATA bypasses that path.
379+
MemoryMode memoryMode = resolveMemoryMode(brokerConfig.getMetricsExportOtelMemoryMode());
359380
OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder()
360381
.setEndpoint(endpoint)
361382
.setCompression("gzip")
383+
.setMemoryMode(memoryMode)
362384
.setTimeout(brokerConfig.getMetricGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
363385
.setAggregationTemporalitySelector(type -> {
364386
if (brokerConfig.isMetricsInDelta() &&
@@ -384,8 +406,16 @@ private void init() {
384406
}
385407

386408
OtlpGrpcMetricExporter otlpExporter = metricExporterBuilder.build();
387-
metricExporter = new BatchSplittingMetricExporter(otlpExporter,
388-
brokerConfig::getMetricsExportBatchMaxDataPoints);
409+
if (brokerConfig.isMetricsExportBatchSplitEnabled()) {
410+
metricExporter = new BatchSplittingMetricExporter(otlpExporter,
411+
brokerConfig::getMetricsExportBatchMaxDataPoints,
412+
brokerConfig::getMetricsExportBatchMaxConcurrent);
413+
} else {
414+
// Escape hatch: skip the splitter wrapper entirely and use the raw
415+
// OTLP exporter. Gives up the oversized-payload guard but removes
416+
// any splitter-side overhead/risk. Re-enable if needed.
417+
metricExporter = otlpExporter;
418+
}
389419

390420
periodicMetricReader = PeriodicMetricReader.builder(metricExporter)
391421
.setInterval(brokerConfig.getMetricGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS)

0 commit comments

Comments
 (0)