Skip to content

feat: user session sequence analyzer for API behaviour modelling#4813

Open
gauravakto wants to merge 4 commits intofeature/mini-runtime-releasefrom
feat/sequence-building
Open

feat: user session sequence analyzer for API behaviour modelling#4813
gauravakto wants to merge 4 commits intofeature/mini-runtime-releasefrom
feat/sequence-building

Conversation

@gauravakto
Copy link
Copy Markdown
Contributor

@gauravakto gauravakto commented Apr 14, 2026

Summary

  • Adds a new `behaviour_modelling` module under `mini-runtime` that tracks per-user API call sequences within tumbling 10-minute windows
  • Aggregates API call counts and transition counts (A→B pairs, extensible to N-grams) for downstream sequence probability modelling
  • Uses `AktoPolicyNew` for URL templatization (raw URLs → parametrized templates like `/users/INTEGER`) without any extra DB calls
  • Flushes per-window transition probabilities to MongoDB `api_sequences` via the `DataActor`/`DbActor`/`ClientActor` abstraction layer
  • Designed for 50-100k req/min throughput with minimal memory footprint

How It Works

Per-record hot path

flowchart TD
    A[HttpResponseParams arrives] --> B[AktoPolicyNew.generateFromHttpResponseParams\nRaw URL → templatized ApiInfoKey]
    B --> C{ApiInfoKey known in\napiInfoCatalogMap?}
    C -- No, unknown API --> X[Skip record]
    C -- Yes --> D[UserIdentifier.extractUserId\nIP address for now]
    D --> E{UserSessionState\nexists?}
    E -- No --> F[Create new state\ndeque = empty\nsessionStart = now]
    E -- Yes --> G{sessionStart\n< windowStart?}
    F --> H
    G -- Yes, stale window --> I[Reset deque\nsessionStart = now]
    G -- No, same window --> H
    I --> H[Check deque size\nsequenceLength - 1 = 1]
    H -- deque has 1 entry --> J[Build TransitionKey\ndeque-0 → current ApiInfoKey\ne.g. GET:/orders → POST:/checkout]
    H -- deque empty --> K
    J --> K[WindowAccumulator\nrecordTransition + recordApiCall]
    K --> L[Push ApiInfoKey onto deque\npop front if size >= sequenceLength-1]
Loading

2-length sequence example (single user, one 10-min window)

t=0m  User calls  GET /products          deque: []          → no transition yet
                                          apiCounts: { GET:/products: 1 }

t=1m  User calls  GET /products/abc123   templatized → GET /products/STRING
      deque: [GET:/products]
                                          → emit transition  GET:/products → GET:/products/STRING
                                          apiCounts: { GET:/products: 1, GET:/products/STRING: 1 }
                                          transitionCounts: { GET:/products → GET:/products/STRING : 1 }

t=3m  User calls  POST /cart/add         deque: [GET:/products/STRING]
                                          → emit transition  GET:/products/STRING → POST:/cart/add
                                          apiCounts: { ..., POST:/cart/add: 1 }
                                          transitionCounts: { ..., GET:/products/STRING → POST:/cart/add : 1 }

t=5m  User calls  POST /checkout         deque: [POST:/cart/add]
                                          → emit transition  POST:/cart/add → POST:/checkout
                                          ...

t=10m ── WINDOW END ──────────────────────────────────────────────────────────────────────
      currentAccumulator swapped to fresh one (new writes go here immediately)
      WindowSnapshot taken from old accumulator → sent async to WindowFlusher
      Per-user deques reset lazily on next access (no sweep needed)

10-minute window flip

sequenceDiagram
    participant K as Kafka records
    participant SA as SessionAnalyzer
    participant CA as CurrentAccumulator (Window N)
    participant NA as NewAccumulator (Window N+1)
    participant F as ApiSequencesFlusher

    K->>SA: process() calls stream in continuously

    Note over SA,CA: t = 0..10min, all writes go to Window N

    Note over SA: t = 10min — onWindowEnd() fires
    SA->>NA: create fresh accumulator
    SA-->>CA: volatile swap: currentAccumulator = NA
    Note over K,NA: new process() calls now write to Window N+1
    SA->>CA: snapshot(windowStart, windowEnd)
    SA->>F: CompletableFuture.runAsync → flush(snapshot)
    Note over F: async — hot path unblocked immediately

    F->>F: Build ApiSequences list\nfor each TransitionKey:\n  paths = [from.toString(), to.toString()]\n  transitionCount, prevStateCount, probability
    F->>DA: DataActor.writeApiSequences(List<ApiSequences>)
    DA->>DB: MongoDB bulkWrite $inc + $setOnInsert upsert\non (apiCollectionId, paths)

    Note over SA,NA: t = 10..20min, all writes go to Window N+1
Loading

Flush payload → MongoDB `api_sequences`

Each flushed transition becomes an upsert keyed on `(apiCollectionId, paths)`:

apiCollectionId: 1234
paths: ["GET /products", "GET /products/STRING"]
transitionCount: 830        ← $inc (cumulative across windows)
prevStateCount:  1820       ← $inc (cumulative)
probability:     0.456      ← $set (latest window value)
lastUpdatedAt:   <epoch>    ← $set
createdAt:       <epoch>    ← $setOnInsert (only on first insert)
isActive:        true       ← $setOnInsert

Module Structure

behaviour_modelling/
├── core/
│   ├── UserIdentifier.java      — extracts user identity from HttpResponseParams
│   ├── WindowAccumulator.java   — accumulates counts; userId param wired for phase-2 unique-user weighting
│   └── WindowFlusher.java       — receives completed WindowSnapshot at end of each window
├── model/
│   ├── TransitionKey.java       — ApiInfoKey[] sequence key with Arrays.equals/hashCode
│   ├── UserSessionState.java    — per-user Deque<ApiInfoKey>; lazily reset on window boundary
│   └── WindowSnapshot.java      — immutable Map<ApiInfoKey,Long> + Map<TransitionKey,Long>
├── impl/
│   ├── IpBasedIdentifier.java   — uses sourceIP as user identity
│   ├── RawCountAccumulator.java — ConcurrentHashMap<LongAdder> for concurrent hot-path increments
│   └── ApiSequencesFlusher.java — converts snapshot → List<ApiSequences> → DataActor.writeApiSequences()
├── SequenceAnalyzerConfig.java  — holds: sequenceLength, windowDurationMs, UserIdentifier,
│                                  Supplier<WindowAccumulator>, WindowFlusher, AktoPolicyNew
└── SessionAnalyzer.java         — orchestrator: process(), atomic window flip, async flush

DataActor layer additions:

  • `DataActor.writeApiSequences(List)` — new abstract method
  • `DbActor.writeApiSequences` — MongoDB `$inc` upsert via `ApiSequencesDao`
  • `ClientActor.writeApiSequences` — HTTP POST to `/writeApiSequences` on database-abstractor

Test plan

  • Unit test `RawCountAccumulator`: concurrent increments produce correct counts
  • Unit test `SessionAnalyzer.process()`: transitions emitted correctly for 2-length sequences
  • Unit test window boundary: no cross-window transitions emitted
  • Unit test templatization: raw numeric/UUID segments resolve to same `ApiInfoKey`
  • Integration: wire `SessionAnalyzer` into `handleResponseParams` and verify snapshot shape
  • Integration: `ApiSequencesFlusher` writes correct documents to `api_sequences` collection

🤖 Generated with Claude Code

Introduces a new behaviour_modelling module under mini-runtime that tracks
per-user API call sequences within tumbling 10-minute windows and aggregates
transition counts for downstream sequence probability modelling.
…equencesFlusher

- DataActor: add abstract writeApiSequences(List<ApiSequences>)
- DbActor: implement with $inc upsert directly against ApiSequencesDao
- ClientActor: implement with batched HTTP POST to /writeApiSequences
- ApiSequencesFlusher: WindowFlusher impl that converts WindowSnapshot
  to ApiSequences and calls dataActor.writeApiSequences()
- Main.java: wire ApiSequencesFlusher into SessionAnalyzer initialization
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant