Skip to content

Commit 61a56ed

Browse files
committed
Add initial implementation of OpAMP
Just reports effective config at startup, but adds wiring for eventual dynamic config updates and reporting.
1 parent 6c95544 commit 61a56ed

File tree

12 files changed

+603
-8
lines changed

12 files changed

+603
-8
lines changed

.github/workflows/ci-main.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ jobs:
2727
steps:
2828
- uses: actions/checkout@v6
2929

30+
- name: Enable long paths (Windows)
31+
if: runner.os == 'Windows'
32+
run: git config --system core.longpaths true
33+
3034
- name: Set up Python ${{ matrix.python-version }}
3135
uses: actions/setup-python@v6
3236
with:

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ dependencies = [
3333
"opentelemetry-instrumentation-system-metrics==0.60b1",
3434
"opentelemetry-semantic-conventions==0.60b1",
3535
"protobuf>=6.31.1", # not our direct dep, prevents installing vulnerable proto versions (CVE‑2025‑4565)
36+
"opentelemetry-opamp-client @ git+https://github.com/open-telemetry/opentelemetry-python-contrib.git@88e5bfc630baa9a0789dd28694319afd506eae09#subdirectory=opamp/opentelemetry-opamp-client", # unreleased, pinned to merge commit of PR #3635
37+
"requests>=2.20.0", # OpAMP: HTTP transport (already transitive dep, making explicit)
3638
]
3739

3840
[project.urls]
@@ -46,6 +48,9 @@ configurator = "splunk_otel.configurator:SplunkConfigurator"
4648
[project.entry-points.opentelemetry_distro]
4749
splunk_distro = "splunk_otel.distro:SplunkDistro"
4850

51+
[tool.hatch.metadata]
52+
allow-direct-references = true
53+
4954
[tool.hatch.version]
5055
path = "src/splunk_otel/__about__.py"
5156

src/splunk_otel/callgraphs/__init__.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,30 @@
88
SPLUNK_SNAPSHOT_SAMPLING_INTERVAL,
99
)
1010

11+
_DEFAULT_SNAPSHOT_SAMPLING_INTERVAL = 10
1112

12-
def _configure_callgraphs_if_enabled(env=None):
13+
14+
class CallgraphsState:
15+
"""Runtime state of the snapshot profiler, for OpAMP reporting."""
16+
17+
def __init__(self, processor: "CallgraphsSpanProcessor | None", interval: int):
18+
self._processor = processor
19+
self._interval = interval
20+
21+
def is_enabled(self) -> bool:
22+
return self._processor is not None
23+
24+
def interval(self) -> int:
25+
if self._processor is not None:
26+
return self._processor.interval_millis()
27+
return self._interval
28+
29+
30+
def _configure_callgraphs_if_enabled(env=None) -> CallgraphsState:
1331
env = env or Env()
32+
interval = env.getint(SPLUNK_SNAPSHOT_SAMPLING_INTERVAL, _DEFAULT_SNAPSHOT_SAMPLING_INTERVAL)
1433
if env.is_true(SPLUNK_SNAPSHOT_PROFILER_ENABLED):
15-
trace.get_tracer_provider().add_span_processor(
16-
CallgraphsSpanProcessor(env.getval(OTEL_SERVICE_NAME), env.getint(SPLUNK_SNAPSHOT_SAMPLING_INTERVAL, 10))
17-
)
34+
processor = CallgraphsSpanProcessor(env.getval(OTEL_SERVICE_NAME), interval)
35+
trace.get_tracer_provider().add_span_processor(processor)
36+
return CallgraphsState(processor, interval)
37+
return CallgraphsState(None, interval)

src/splunk_otel/callgraphs/span_processor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ def on_end(self, span: ReadableSpan) -> None:
7474
if len(self._span_id_to_trace_id) == 0:
7575
self._profiler.pause_after(60.0)
7676

77+
def interval_millis(self) -> int:
78+
return int(self._profiler.interval_seconds * 1000)
79+
7780
def shutdown(self) -> None:
7881
self._profiler.stop()
7982

src/splunk_otel/configurator.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,56 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from opentelemetry import trace
1516
from opentelemetry.sdk._configuration import _OTelSDKConfigurator
1617

1718
from splunk_otel.profile import _start_profiling_if_enabled
1819
from splunk_otel.callgraphs import _configure_callgraphs_if_enabled
20+
from splunk_otel.opamp import _start_opamp_if_enabled
21+
from splunk_otel.opamp.config_registry import ConfigRegistry
22+
from splunk_otel.env import (
23+
Env,
24+
SPLUNK_PROFILER_ENABLED,
25+
SPLUNK_PROFILER_CALL_STACK_INTERVAL,
26+
SPLUNK_SNAPSHOT_PROFILER_ENABLED,
27+
SPLUNK_SNAPSHOT_SAMPLING_INTERVAL,
28+
OTEL_SERVICE_NAME,
29+
OTEL_EXPORTER_OTLP_ENDPOINT,
30+
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
31+
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
32+
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
33+
)
1934

2035

2136
class SplunkConfigurator(_OTelSDKConfigurator):
2237
def _configure(self, **kwargs):
2338
super()._configure(**kwargs)
24-
_start_profiling_if_enabled()
25-
_configure_callgraphs_if_enabled()
39+
40+
env = Env()
41+
profiling = _start_profiling_if_enabled(env)
42+
callgraphs = _configure_callgraphs_if_enabled(env)
43+
44+
registry = self._build_registry(profiling, callgraphs, env)
45+
resource = trace.get_tracer_provider().resource
46+
_start_opamp_if_enabled(resource.attributes, registry, env)
47+
48+
def _build_registry(self, profiling_state, callgraphs_state, env) -> ConfigRegistry:
49+
# Registry keys are the canonical EffectiveConfig field names (which happen to match
50+
# env var names). The get callbacks read live module state, not the env vars -- so the
51+
# reported values reflect actual runtime state regardless of how the module was configured.
52+
# For static values that can only change on restart, we read from the env directly.
53+
registry = ConfigRegistry()
54+
registry.register(SPLUNK_PROFILER_ENABLED, getter=lambda: str(profiling_state.is_enabled()).lower())
55+
registry.register(SPLUNK_PROFILER_CALL_STACK_INTERVAL, getter=lambda: str(profiling_state.interval_millis()))
56+
registry.register(SPLUNK_SNAPSHOT_PROFILER_ENABLED, getter=lambda: str(callgraphs_state.is_enabled()).lower())
57+
registry.register(SPLUNK_SNAPSHOT_SAMPLING_INTERVAL, getter=lambda: str(callgraphs_state.interval()))
58+
59+
for key in (
60+
OTEL_SERVICE_NAME,
61+
OTEL_EXPORTER_OTLP_ENDPOINT,
62+
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT,
63+
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT,
64+
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
65+
):
66+
registry.register(key, getter=lambda k=key: env.getval(k) or None)
67+
return registry

src/splunk_otel/env.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@
5555
SPLUNK_SNAPSHOT_SAMPLING_INTERVAL = "SPLUNK_SNAPSHOT_SAMPLING_INTERVAL"
5656
SPLUNK_SNAPSHOT_SELECTION_PROBABILITY = "SPLUNK_SNAPSHOT_SELECTION_PROBABILITY"
5757
SPLUNK_REALM = "SPLUNK_REALM"
58+
SPLUNK_OPAMP_ENABLED = "SPLUNK_OPAMP_ENABLED"
59+
SPLUNK_OPAMP_ENDPOINT = "SPLUNK_OPAMP_ENDPOINT"
60+
SPLUNK_OPAMP_TOKEN = "SPLUNK_OPAMP_TOKEN" # noqa: S105
61+
SPLUNK_OPAMP_POLLING_INTERVAL = "SPLUNK_OPAMP_POLLING_INTERVAL"
62+
OTEL_SERVICE_NAME = "OTEL_SERVICE_NAME"
63+
OTEL_EXPORTER_OTLP_ENDPOINT = "OTEL_EXPORTER_OTLP_ENDPOINT"
64+
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
65+
OTEL_EXPORTER_OTLP_METRICS_ENDPOINT = "OTEL_EXPORTER_OTLP_METRICS_ENDPOINT"
66+
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"
5867

5968
_pylogger = logging.getLogger(__name__)
6069

src/splunk_otel/opamp/__init__.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright Splunk Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
17+
from opentelemetry.sdk.resources import (
18+
TELEMETRY_SDK_LANGUAGE,
19+
TELEMETRY_SDK_NAME,
20+
TELEMETRY_SDK_VERSION,
21+
)
22+
from opentelemetry._opamp.agent import OpAMPAgent
23+
from opentelemetry._opamp.client import OpAMPClient
24+
from opentelemetry._opamp.proto import opamp_pb2
25+
26+
from splunk_otel.env import (
27+
Env,
28+
SPLUNK_ACCESS_TOKEN,
29+
SPLUNK_OPAMP_ENABLED,
30+
SPLUNK_OPAMP_ENDPOINT,
31+
SPLUNK_OPAMP_POLLING_INTERVAL,
32+
SPLUNK_OPAMP_TOKEN,
33+
)
34+
from splunk_otel.distro import _DISTRO_NAME
35+
from splunk_otel.opamp.config_registry import ConfigRegistry
36+
37+
logger = logging.getLogger(__name__)
38+
39+
_IDENTIFYING_RESOURCE_KEYS = (
40+
"service.name",
41+
"service.namespace",
42+
"service.instance.id",
43+
"service.version",
44+
)
45+
46+
_NON_IDENTIFYING_RESOURCE_KEYS = (
47+
"os.type",
48+
"os.name",
49+
"os.version",
50+
"host.name",
51+
"host.arch",
52+
"process.pid",
53+
"process.runtime.name",
54+
"process.runtime.version",
55+
# deployment.environment.name is kept non-identifying: it labels/filters an instance
56+
# but does it make it unique? Two instances of the same service in the same environment
57+
# would share this value, so not sure that it belongs in identifying_attributes.
58+
# Note: splunk-otel-java puts this in identifying.
59+
"deployment.environment.name",
60+
)
61+
62+
_DEFAULT_POLLING_INTERVAL_MS = 30000
63+
64+
65+
def _start_opamp_if_enabled(resource_attrs, registry: ConfigRegistry, env: Env) -> None:
66+
if not env.is_true(SPLUNK_OPAMP_ENABLED):
67+
logger.debug("OpAMP disabled (SPLUNK_OPAMP_ENABLED not set to true)")
68+
return
69+
70+
endpoint = env.getval(SPLUNK_OPAMP_ENDPOINT)
71+
if not endpoint:
72+
logger.warning("SPLUNK_OPAMP_ENABLED=true but SPLUNK_OPAMP_ENDPOINT is not set; OpAMP disabled")
73+
return
74+
75+
token = env.getval(SPLUNK_OPAMP_TOKEN) or env.getval(SPLUNK_ACCESS_TOKEN)
76+
polling_interval_ms = env.getint(SPLUNK_OPAMP_POLLING_INTERVAL, _DEFAULT_POLLING_INTERVAL_MS)
77+
78+
logger.info("Starting OpAMP client: %s", endpoint)
79+
80+
try:
81+
_start_opamp(endpoint, token, polling_interval_ms, registry, resource_attrs)
82+
83+
except Exception:
84+
logger.exception("Failed to start OpAMP client")
85+
86+
87+
def _start_opamp(endpoint: str, token: str, polling_interval_ms: int, registry: ConfigRegistry, resource_attrs):
88+
headers = {}
89+
if token:
90+
headers["Authorization"] = f"Bearer {token}"
91+
92+
identifying_attrs, non_identifying_attrs = _build_agent_attributes(resource_attrs)
93+
client = OpAMPClient(
94+
endpoint=endpoint,
95+
headers=headers,
96+
agent_identifying_attributes=identifying_attrs,
97+
agent_non_identifying_attributes=non_identifying_attrs,
98+
)
99+
client.update_effective_config(
100+
{"": registry.get_all()},
101+
content_type="application/json",
102+
)
103+
agent = OpAMPAgent(
104+
interval=polling_interval_ms / 1000,
105+
message_handler=_handle_server_message,
106+
client=client,
107+
)
108+
agent.start()
109+
logger.info("OpAMP client started")
110+
111+
112+
def _build_agent_attributes(resource_attrs) -> tuple[dict, dict]:
113+
from splunk_otel.__about__ import __version__ as distro_version
114+
115+
identifying_attrs = {}
116+
for key in _IDENTIFYING_RESOURCE_KEYS:
117+
val = resource_attrs.get(key)
118+
if val is not None:
119+
identifying_attrs[key] = str(val)
120+
121+
identifying_attrs.update(
122+
{
123+
TELEMETRY_SDK_LANGUAGE: "python",
124+
TELEMETRY_SDK_NAME: "opentelemetry",
125+
TELEMETRY_SDK_VERSION: str(resource_attrs.get(TELEMETRY_SDK_VERSION, "unknown")),
126+
"telemetry.distro.name": _DISTRO_NAME,
127+
"telemetry.distro.version": distro_version,
128+
}
129+
)
130+
131+
non_identifying_attrs = {}
132+
for key in _NON_IDENTIFYING_RESOURCE_KEYS:
133+
val = resource_attrs.get(key)
134+
if val is not None:
135+
non_identifying_attrs[key] = str(val)
136+
137+
return identifying_attrs, non_identifying_attrs
138+
139+
140+
def _handle_server_message(
141+
_agent: OpAMPAgent,
142+
_client: OpAMPClient,
143+
message: opamp_pb2.ServerToAgent,
144+
) -> None:
145+
logger.debug("ServerToAgent: flags=%s", message.flags)
146+
# TODO (Phase 2): check for ServerToAgentFlags_ReportFullState and respond with
147+
# build_full_state_message(). Not needed in Phase 1 — we don't accept remote config
148+
# so effective config never changes after startup, making a resync request a no-op.

0 commit comments

Comments
 (0)