Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions dojo/tools/api_sonarqube/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,60 @@ def find_hotspots(self, project_key, organization=None, branch=None):

return hotspots

def find_sca_risks(self, component_key, organization=None, branch=None):
"""
Search for SCA dependency risks.
:param component_key: component key
:return:
"""
page = 1
max_page = 100
risks = []

while page <= max_page:
request_filter = {
"component": component_key,
"pageIndex": page,
"pageSize": 500,
}

if branch:
request_filter["branch"] = branch

if organization:
request_filter["organization"] = organization
elif self.org_id:
request_filter["organization"] = self.org_id

response = self.session.get(
url=f"{self.sonar_api_url.replace('/api', '/api/v2')}/sca/risk-reports",
params=request_filter,
headers=self.default_headers,
timeout=settings.REQUESTS_TIMEOUT,
)

if not response.ok:
msg = (
f"Unable to find SCA risks for component {component_key} "
f"due to {response.status_code} - {response.content}"
)
raise Exception(msg)

response_data = response.json()
# SCA API v2 may return paginated response or flat array
if isinstance(response_data, list):
# Flat array response (no pagination metadata)
risks.extend(response_data)
break
# Paginated response with issuesReleases array
risks_page = response_data.get("issuesReleases", [])
if not risks_page:
break
risks.extend(risks_page)
page += 1

return risks

def get_issue(self, issue_key):
"""
Search for issues.
Expand Down
135 changes: 135 additions & 0 deletions dojo/tools/api_sonarqube/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def get_findings(self, filename, test):
items.extend(self.import_hotspots(test))
else:
items = self.import_hotspots(test)
if getattr(settings, "SONARQUBE_API_PARSER_SCA", True):
if items:
items.extend(self.import_sca(test))
else:
items = self.import_sca(test)
return items

@staticmethod
Expand Down Expand Up @@ -336,6 +341,121 @@ def import_hotspots(self, test):

return items

def import_sca(self, test):
try:
items = []
client, config = self.prepare_client(test)
# Get the value in the service key 2 box
organization = (
config.service_key_2
if (config and config.service_key_2)
else None
)
# Get the value in the service key 1 box
if config and config.service_key_1:
component_key = config.service_key_1
else:
component = client.find_project(
test.engagement.product.name,
organization=organization,
branch=test.branch_tag,
)
component_key = component["key"]

sca_risks = client.find_sca_risks(
component_key,
organization=organization,
branch=test.branch_tag,
)
logger.info(
f"Found {len(sca_risks)} SCA risks for component {component_key}",
)

for risk in sca_risks:
# Skip if status is not OPEN
if risk.get("riskStatus") != "OPEN":
continue

# Extract fields
title = risk.get("riskTitle", "Unknown SCA Risk")
vulnerability_id = risk.get("vulnerabilityId")
cvss_score = risk.get("cvssScore")
cwe_ids = risk.get("cweIds", [])
cwe = int(cwe_ids[0].replace("CWE-", "")) if cwe_ids else None
package_url = risk.get("packageUrl", "")
dependency_chains = risk.get("dependencyChains", [])
published_on = risk.get("publishedOn")
severity = self.convert_sca_severity(risk.get("riskSeverity", "INFO"))

# Parse component name and version from packageUrl (pkg:maven/group/artifact@version)
component_name = None
component_version = None
if package_url:
try:
# Extract after pkg:type/
parts = package_url.split("/")
if len(parts) >= 2:
last_part = parts[-1]
if "@" in last_part:
name_part = "/".join(parts[1:]).split("@")[0]
component_name = name_part
component_version = last_part.split("@")[1]
else:
component_name = "/".join(parts[1:])
except Exception:
component_name = package_url

# Build description
description = f"**Vulnerability:** {vulnerability_id}\n"
description += f"**Package:** {package_url}\n"
if cvss_score:
description += f"**CVSS Score:** {cvss_score}\n"
if cwe_ids:
description += f"**CWE:** {', '.join(cwe_ids)}\n"
if published_on:
description += f"**Published:** {published_on}\n"
if dependency_chains:
description += "\n**Dependency Chains:**\n"
for chain in dependency_chains:
description += " → ".join(chain) + "\n"

find = Finding(
title=title,
cwe=cwe,
description=description,
test=test,
severity=severity,
component_name=component_name,
component_version=component_version,
cvssv3_score=cvss_score,
verified=True,
false_p=False,
duplicate=False,
out_of_scope=False,
static_finding=True,
unique_id_from_tool=f"sca:{vulnerability_id}:{package_url}",
)

if vulnerability_id:
find.unsaved_vulnerability_ids = [vulnerability_id]
else:
find.unsaved_vulnerability_ids = []

items.append(find)

except Exception as e:
logger.exception("SonarQube SCA API import issue")
create_notification(
event="sonarqube_failed",
title="SonarQube SCA API import issue",
description=e,
icon="exclamation-triangle",
source="SonarQube API",
obj=test.engagement.product,
)

return items

@staticmethod
def clean_rule_description_html(raw_html):
search = re.search(
Expand Down Expand Up @@ -380,6 +500,21 @@ def convert_scanner_confidence(sonar_scanner_confidence):
return 7
return 7

@staticmethod
def convert_sca_severity(sca_severity):
sev = sca_severity.upper()
if sev == "BLOCKER":
return "Critical"
if sev == "CRITICAL":
return "Critical"
if sev == "HIGH":
return "High"
if sev == "MEDIUM":
return "Medium"
if sev == "LOW":
return "Low"
return "Info"

@staticmethod
def get_references(vuln_details):
parser = etree.HTMLParser()
Expand Down
Loading
Loading