Skip to content

Commit 53555d0

Browse files
committed
resolve concurrent job creation race condition
1 parent 4abcb35 commit 53555d0

File tree

4 files changed

+104
-4
lines changed

4 files changed

+104
-4
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
2+
# See the file 'LICENSE' for copying permission.
3+
4+
from django.core.management.base import BaseCommand
5+
from django.db import connection, transaction
6+
from django.db.models import Count
7+
8+
from api_app.models import Job
9+
10+
11+
class Command(BaseCommand):
12+
help = "Fixes Split-Brain scenarios in Jobs by merging duplicate path entries."
13+
14+
def handle(self, *args, **options):
15+
duplicates = Job.objects.values("path").annotate(path_count=Count("id")).filter(path_count__gt=1)
16+
17+
if not duplicates:
18+
self.stdout.write(self.style.SUCCESS("No split-brain jobs found."))
19+
return
20+
21+
self.stdout.write(self.style.WARNING(f"Found {len(duplicates)} duplicate paths."))
22+
23+
for entry in duplicates:
24+
path = entry["path"]
25+
jobs = list(Job.objects.filter(path=path).order_by("id"))
26+
27+
winner = jobs[0]
28+
losers = jobs[1:]
29+
30+
self.stdout.write(f"\nMerging into winner Job PK={winner.pk} (path='{path}')")
31+
32+
with transaction.atomic():
33+
for loser in losers:
34+
self.stdout.write(f" Processing loser Job PK={loser.pk}...")
35+
36+
tags = list(loser.tags.all())
37+
if tags:
38+
winner.tags.add(*tags)
39+
self.stdout.write(f" Moved {len(tags)} tags")
40+
41+
for report_type in [
42+
"analyzerreports",
43+
"connectorreports",
44+
"visualizerreports",
45+
"pivotreports",
46+
"ingestorreports",
47+
]:
48+
if hasattr(loser, report_type):
49+
reports_count = getattr(loser, report_type).update(job=winner)
50+
if reports_count:
51+
self.stdout.write(f" Moved {reports_count} {report_type}")
52+
53+
try:
54+
comment_count = loser.comments.update(job=winner)
55+
self.stdout.write(f" Moved {comment_count} comments")
56+
except Exception:
57+
pass
58+
59+
if not winner.investigation and loser.investigation:
60+
winner.investigation = loser.investigation
61+
winner.save()
62+
self.stdout.write(" Transferred investigation")
63+
64+
loser_id = loser.pk
65+
with connection.cursor() as cursor:
66+
cursor.execute("DELETE FROM api_app_job WHERE id = %s", [loser_id])
67+
self.stdout.write(self.style.SUCCESS(f" Deleted ghost Job PK={loser_id}"))
68+
69+
self.stdout.write(self.style.SUCCESS("\nCleanup complete."))
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Generated by Django 4.2.27 on 2026-04-08 18:50
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
('api_app', '0073_alter_updatecheckstatus_last_checked_at_and_more'),
10+
]
11+
12+
operations = [
13+
migrations.AddConstraint(
14+
model_name='job',
15+
constraint=models.UniqueConstraint(fields=('path',), name='job_path_unique'),
16+
),
17+
]

api_app/models.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,12 @@ class Meta:
376376
# WHERE ("api_app_job"."depth" >= ? AND "api_app_job"."path"::text LIKE ? AND NOT ("api_app_job"."id" = ?))
377377
models.Index(fields=["depth", "path", "id"], name="MPNodeSearch"),
378378
]
379+
constraints = [
380+
UniqueConstraint(
381+
fields=["path"],
382+
name="job_path_unique",
383+
)
384+
]
379385

380386
# constants
381387
TLP = TLP

api_app/queryset.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -266,17 +266,25 @@ def create(self, parent=None, **kwargs):
266266
Returns:
267267
The created job.
268268
"""
269-
if parent:
270-
return parent.add_child(**kwargs)
271269
# try multiple times hoping to for no race conditions
272-
total_attempt_number = 5
270+
total_attempt_number = 10
273271
for attempt in range(0, total_attempt_number):
274272
try:
273+
if parent:
274+
if attempt > 0:
275+
parent.refresh_from_db()
276+
return parent.add_child(**kwargs)
275277
return self.model.add_root(**kwargs)
276278
except IntegrityError:
277-
logger.warning(f"Found race condition for {kwargs['name']}. Trying again to calculate path.")
279+
logger.warning(
280+
f"Found race condition for {kwargs.get('analyzable', 'unknown')}. Trying again to calculate path."
281+
)
278282
if attempt == total_attempt_number - 1:
279283
raise
284+
import time
285+
286+
# exponential backoff
287+
time.sleep(0.01 * (2**attempt))
280288

281289
def delete(self, *args, **kwargs):
282290
"""

0 commit comments

Comments
 (0)