Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from vulnerabilities.pipelines import pysec_importer
from vulnerabilities.pipelines.v2_importers import aosp_importer as aosp_importer_v2
from vulnerabilities.pipelines.v2_importers import apache_httpd_importer as apache_httpd_v2
from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2
from vulnerabilities.pipelines.v2_importers import (
Expand Down Expand Up @@ -78,6 +79,7 @@
IMPORTERS_REGISTRY = create_registry(
[
archlinux_importer_v2.ArchLinuxImporterPipeline,
apache_kafka_importer_v2.ApacheKafkaImporterPipeline,
nvd_importer_v2.NVDImporterPipeline,
elixir_security_importer_v2.ElixirSecurityImporterPipeline,
npm_importer_v2.NpmImporterPipeline,
Expand Down
136 changes: 136 additions & 0 deletions vulnerabilities/pipelines/v2_importers/apache_kafka_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
from datetime import timezone
from traceback import format_exc as traceback_format_exc
from typing import Iterable

import requests
from bs4 import BeautifulSoup
from dateutil.parser import parse
from packageurl import PackageURL
from univers.version_range import ApacheVersionRange

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.models import AdvisoryReference
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.pipes.apache_kafka import get_original_advisory
from vulnerabilities.pipes.apache_kafka import parse_range
from vulnerabilities.pipes.apache_kafka import parse_summary
from vulnerabilities.utils import build_description


class ApacheKafkaImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Import Apache Kafka Advisories"""

pipeline_id = "apache_kafka_importer_v2"
spdx_license_expression = "Apache-2.0"
importer_name = "Apache Kafka Importer V2"

license_url = "https://www.apache.org/licenses/"
url = "https://kafka.apache.org/community/cve-list/"

cve_without_affected_fixed_range = [
"CVE-2022-23302",
"CVE-2022-23305",
"CVE-2022-23307",
"CVE-2021-45046",
"CVE-2021-44228",
"CVE-2021-4104",
]

@classmethod
def steps(cls):
return (
cls.fetch,
cls.collect_and_store_advisories,
)

def fetch(self):
self.log(f"Fetch `{self.url}`")
self.advisory_data = requests.get(self.url).text
self.soup = BeautifulSoup(self.advisory_data, features="lxml")

def advisories_count(self):
return sum(1 for _ in self.soup.find(class_="td-content").find_all("table"))

def collect_advisories(self) -> Iterable[AdvisoryData]:
for table in self.soup.find(class_="td-content").find_all("table"):
yield self.to_advisory_data(table)

def to_advisory_data(self, table) -> Iterable[AdvisoryData]:
affected_constraints = None
fixed_constraints = None
affected_packages = []
references = []

cve_h2 = table.find_previous("h2")
refrence_a = cve_h2.find("a") or {}
title = cve_h2.text
ref_url = refrence_a.get("href")
cve = cve_h2.get("id")

raw_affected = table.find(text="Versions affected").find_next("p").text
raw_fixed = table.find(text="Fixed versions").find_next("p").text
raw_date = table.find(text="Issue announced").find_next("p").text
date_published = parse(raw_date).replace(tzinfo=timezone.utc)

description = parse_summary(cve_h2, table)
original_advisory = get_original_advisory(cve_h2, table)

if cve not in self.cve_without_affected_fixed_range:
affected_constraints = parse_range(raw_affected)
fixed_constraints = parse_range(raw_fixed)

try:
fixed_version_range = (
ApacheVersionRange(constraints=fixed_constraints) if fixed_constraints else None
)

affected_version_range = (
ApacheVersionRange(constraints=affected_constraints)
if affected_constraints
else None
)
except Exception as e:
self.log(
f"Failed to parse Kafka range for: {cve} with error {e!r}:\n{traceback_format_exc()}",
level=logging.ERROR,
)

if affected_version_range or fixed_version_range:
affected_packages.append(
AffectedPackageV2(
package=PackageURL(type="apache", name="kafka"),
affected_version_range=affected_version_range,
fixed_version_range=fixed_version_range,
)
)

references.append(
ReferenceV2(
reference_id=cve,
reference_type=AdvisoryReference.OTHER,
url=ref_url,
)
)

return AdvisoryData(
advisory_id=cve,
aliases=[],
summary=build_description(summary=title, description=description),
date_published=date_published,
affected_packages=affected_packages,
references_v2=references,
url=f"{self.url}#{cve}",
original_advisory_text=original_advisory,
)
73 changes: 73 additions & 0 deletions vulnerabilities/pipes/apache_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#


from univers.version_constraint import VersionConstraint
from univers.versions import SemverVersion


def get_original_advisory(cve_h2, table):
adv_segment = [str(cve_h2)]

for el in cve_h2.next_elements:
if getattr(el, "name"):
adv_segment.append(str(el))
if el == table:
break

return "".join(adv_segment)


def parse_summary(cve_h2, table):
summary = ""
for el in cve_h2.next_elements:
if el == table:
break
if getattr(el, "name") == "p":
summary += f"{el.text} "

return summary


def parse_range(raw_range):
if ":" in raw_range:
raw_range = raw_range.partition(":")[-1]

raw_range = raw_range.replace("to", "-")
raw_range = raw_range.replace("and", "").replace("later", "")
raw_range = raw_range.strip()
parsed_range = []
for range in raw_range.split(","):
range = range.strip()
if not range:
continue
if "-" not in range:
parsed_range.append(
VersionConstraint(
comparator="=",
version=SemverVersion(range),
)
)
continue

lhs, rhs = range.split("-")
parsed_range.append(
VersionConstraint(
comparator=">=",
version=SemverVersion(lhs.strip()),
)
)
parsed_range.append(
VersionConstraint(
comparator="<=",
version=SemverVersion(rhs.strip()),
)
)

return parsed_range
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#


from pathlib import Path
from unittest.mock import patch

from bs4 import BeautifulSoup
from django.test import TestCase

from vulnerabilities.models import AdvisoryV2
from vulnerabilities.pipelines.v2_importers.apache_kafka_importer import ApacheKafkaImporterPipeline
from vulnerabilities.tests import util_tests
from vulnerabilities.tests.pipelines import TestLogger

TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "apache_kafka"


class TestApacheKafkaImporterPipeline(TestCase):
def setUp(self):
self.logger = TestLogger()

@patch(
"vulnerabilities.pipelines.v2_importers.apache_kafka_importer.ApacheKafkaImporterPipeline.fetch"
)
def test_redhat_advisories_v2(self, mock_fetch):
mock_fetch.__name__ = "fetch"
cve_list = TEST_DATA / "cve-list-2026_01_23.html"
advisory_data = open(cve_list).read()

pipeline = ApacheKafkaImporterPipeline()
pipeline.soup = BeautifulSoup(advisory_data, features="lxml")
pipeline.log = self.logger.write
pipeline.execute()

expected_file = TEST_DATA / "cve-list-2026_01_23-expected.json"
result = [adv.to_advisory_data().to_dict() for adv in AdvisoryV2.objects.all()]
util_tests.check_results_against_json(result, expected_file)
93 changes: 93 additions & 0 deletions vulnerabilities/tests/pipes/test_apache_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#


from pathlib import Path
from unittest import TestCase

from bs4 import BeautifulSoup

from vulnerabilities.pipes.apache_kafka import get_original_advisory
from vulnerabilities.pipes.apache_kafka import parse_range
from vulnerabilities.pipes.apache_kafka import parse_summary
from vulnerabilities.tests.pipelines import TestLogger

TEST_DATA = Path(__file__).parent.parent / "test_data" / "apache_kafka"


class TestPipeApacheKafka(TestCase):
def setUp(self):
self.logger = TestLogger()
cve_list = TEST_DATA / "cve-list-2026_01_23.html"
advisory_data = open(cve_list).read()
soup = BeautifulSoup(advisory_data, features="lxml")
self.tables = soup.find(class_="td-content").find_all("table")
self.tables = list(self.tables)

def test_vulnerability_pipes_apache_kafka_get_summary(self):
table = self.tables[0]
cve_h2 = table.find_previous("h2")

result = parse_summary(
cve_h2=cve_h2,
table=table,
)
expected = (
"In CVE-2023-25194, we announced the RCE/Denial of service attack via SASL "
"JAAS JndiLoginModule configuration in Kafka Connect API. But not only Kafka "
"Connect API is vulnerable to this attack, the Apache Kafka brokers also have "
"this vulnerability. To exploit this vulnerability, the attacker needs to be "
"able to connect to the Kafka cluster and have the AlterConfigs permission on "
"the cluster resource. Since Apache Kafka 3.4.0, we have added a system property "
'("-Dorg.apache.kafka.disallowed.login.modules") to disable the problematic login '
"modules usage in SASL JAAS configuration. Also by default "
"“com.sun.security.auth.module.JndiLoginModule” is disabled in Apache Kafka 3.4.0, "
"and “com.sun.security.auth.module.JndiLoginModule,com.sun.security.auth.module.LdapLoginModule” "
"is disabled by default in Apache Kafka 3.9.1/4.0.0. "
)
self.assertEqual(result, expected)

def test_vulnerability_pipes_apache_kafka_get_original_advisory(self):
table = self.tables[0]
cve_h2 = table.find_previous("h2")

result = get_original_advisory(
cve_h2=cve_h2,
table=table,
)

self.assertIn('id="CVE-2025-27819"', result)
self.assertIn("<p>2.0.0 - 3.3.2</p>", result)

def test_vulnerability_pipes_apache_kafka_parse_range(self):
affected = "2.8.0 - 2.8.1, 3.0.0 - 3.0.1, 3.1.0 - 3.1.1, 3.2.0 - 3.2.1"

result_affected = parse_range(affected)
result_affected = [str(const) for const in result_affected]
expected_affected = [
">=2.8.0",
"<=2.8.1",
">=3.0.0",
"<=3.0.1",
">=3.1.0",
"<=3.1.1",
">=3.2.0",
"<=3.2.1",
]

self.assertCountEqual(result_affected, expected_affected)

def test_vulnerability_pipes_apache_kafka_parse_range_dirty_range(self):
affected = "Apache Kafka Connect API (connect-api,connect-runtime) : 2.3.0 - 3.3.2"

result_affected = parse_range(affected)
result_affected = [str(const) for const in result_affected]
expected_affected = [">=2.3.0", "<=3.3.2"]

self.assertCountEqual(result_affected, expected_affected)
Loading