Skip to content

HoBom-s/hobom-event-processor

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

69 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

hobom-event-processor

A Go service that polls domain outbox events via gRPC and forwards them to Kafka. Failed events are stored in a Redis-backed Dead Letter Queue (DLQ) and can be replayed via a management API.


Architecture

┌──────────────────────────────┐  ┌──────────────────────────────┐
│  for-hobom-backend (gRPC)    │  │  hobom-space-backend (gRPC)  │
│  Outbox: PENDING → SENT      │  │  Outbox: PENDING → SENT      │
└──────────────┬───────────────┘  └──────────────┬───────────────┘
               │ gRPC poll (5s)                   │ gRPC poll (5s)
    ┌──────────▼──────────┐            ┌──────────▼──────────┐
    │  MessagePoller      │            │  SpacePoller         │
    │  LogPoller          │            │  SpaceLogPoller      │
    └──────────┬──────────┘            └──────────┬──────────┘
               │                                  │
               └──────────────┬───────────────────┘
                    ┌─────────▼─────────┐
                    │  publishWithRetry  │  3 attempts, exponential backoff
                    └─────────┬──────┬──┘
                              │      │ on failure
                     ┌────────▼─┐  ┌─▼──────────────────────────┐
                     │  Kafka   │  │  Redis DLQ (TTL: 72h)       │
                     │ Topics   │  │  Key: dlq:[category]:[id]   │
                     └──────────┘  └─────────────────────────────┘
                                              │
                                   ┌──────────▼──────────────────┐
                                   │  DLQ Management API (Gin)    │
                                   │  GET  /dlq                   │
                                   │  GET  /dlq/:key              │
                                   │  POST /dlq/retry/:key        │
                                   └─────────────────────────────┘

### LawPoller (Direct Orchestration — No Kafka)

LAW_CHANGED 이벤트는 Kafka를 사용하지 않고 직접 오케스트레이션합니다.

┌──────────────────────────┐ │ for-hobom-backend │ │ Outbox: LAW_CHANGED │ └────────────┬─────────────┘ │ gRPC poll (5s) ┌──────────▼──────────┐ │ LawPoller │ └──┬──────────────┬────┘ │ │ │ 1. Generate │ 3. Save result ▼ ▼ ┌──────────┐ ┌──────────────────────────┐ │ LLM gRPC │ │ for-hobom-backend gRPC │ │ (50052) │ │ SaveStudyMaterial │ └──────────┘ └──────────────────────────┘ │ │ on failure ▼ ┌─────────────────────────────┐ │ Redis DLQ (dlq:law:[id]) │ │ ⚠ Kafka retry 미지원 │ └─────────────────────────────┘


Event Types

Event Type Kafka Topic DLQ Prefix gRPC Source Description
MESSAGE hobom.messages dlq:menu: for-hobom-backend User-to-user message delivery
HOBOM_LOG hobom.logs dlq:log: for-hobom-backend API request/response log batches
SPACE_EVENT hobom.space-events dlq:space: hobom-space-backend Space document events
LAW_CHANGED N/A (direct) dlq:law: for-hobom-backend + LLM Privacy-law study material gen

Retry & Error Handling

  1. Polling: every 5 seconds via gRPC, fetches all PENDING outbox events.
  2. Publish with retry: up to 3 attempts with exponential backoff (200ms → 400ms).
  3. On success: marks the outbox record as SENT via gRPC.
  4. On failure: marks as FAILED via gRPC, stores payload in Redis DLQ (72h TTL).
  5. DLQ replay: call POST /dlq/retry/:key to re-publish and remove from DLQ.

Log events are published as a single JSON array per poll cycle for efficiency. DLQ entries for log events store individual payloads as single-element arrays to ensure consistent format on retry.

LAW_CHANGED: Kafka를 사용하지 않고 직접 오케스트레이션합니다. Poll → LLM Generate → Save StudyMaterial → Mark SENT. 실패 시 DLQ에 저장되지만, DLQ retry API를 통한 재시도는 지원하지 않습니다 (Kafka topic이 없으므로).


DLQ Management API

Base path: /hobom-event-processor/internal/api/v1

List DLQ entries

# All entries
curl http://localhost:8082/hobom-event-processor/internal/api/v1/dlq

# Filter by prefix
curl "http://localhost:8082/hobom-event-processor/internal/api/v1/dlq?prefix=dlq:log:"

Inspect a DLQ entry

curl http://localhost:8082/hobom-event-processor/internal/api/v1/dlq/dlq:menu:event-abc

Replay a DLQ entry

curl -X POST http://localhost:8082/hobom-event-processor/internal/api/v1/dlq/retry/dlq:menu:event-abc

Health check

curl http://localhost:8082/health
# {"status":"ok","statusCode":200,"message":"Service is healthy"}

Configuration

All configuration is managed via environment variables. Locally, create a .env file in the project root (already in .gitignore).

Variable Required Default Description
HOBOM_GRPC_ADDR Yes - for-hobom-backend gRPC address (e.g. dev-for-hobom-backend:50051)
HOBOM_GRPC_API_KEY Yes - API key for gRPC authentication
HOBOM_KAFKA_BROKER Yes - Kafka broker address (e.g. kafka:9092)
HOBOM_REDIS_ADDR Yes - Redis address (e.g. redis:6379)
HOBOM_SPACE_GRPC_ADDR No - hobom-space-backend gRPC address. SpacePoller only runs when this is set
HOBOM_SPACE_GRPC_API_KEY No HOBOM_GRPC_API_KEY Dedicated API key for space gRPC. Falls back to the default API key if unset
HOBOM_LLM_GRPC_ADDR No - hobom-llm-service-backend gRPC address. LawPoller only runs when this is set
HOBOM_LLM_GRPC_API_KEY No HOBOM_GRPC_API_KEY Dedicated API key for LLM gRPC. Falls back to the default API key if unset
HOBOM_HTTP_ADDR No :8082 HTTP server listen address

Kafka publisher defaults (via DefaultKafkaConfig): RequireOne acks, LeastBytes balancer, 10s write timeout.

Production (Docker): .env is loaded from the deploy server via --env-file (e.g. /etc/hobom-dev/dev-hobom-event-processor/.env).


Running locally

# 1. Start infrastructure
docker compose -f infra/kafka/docker-compose.yml up -d
docker compose -f infra/redis/docker-compose.yml up -d

# 2. Create .env
cp .env.example .env   # edit values as needed

# 3. Generate protobuf code and run
make run

Development

# Generate proto files
make proto

# Run tests
go test ./...

# Sync protobuf submodule
make sync-submodule

Graceful Shutdown

On SIGTERM / SIGINT:

  1. Context is cancelled — pollers finish their current poll cycle before stopping.
  2. In-flight poll results are waited on via sync.WaitGroup.
  3. HTTP server shuts down with a 5s timeout.

Releases

No releases published

Packages

 
 
 

Contributors

Languages