Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 49 additions & 6 deletions aspm_cli/commands/scan_command.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import json
from pydantic import ValidationError

from aspm_cli.commands.base_command import BaseCommand
Expand All @@ -20,6 +21,11 @@ def configure_parser(self, parser):
parser.add_argument("--label", help="The label created in AccuKnox for associating scan results.")
parser.add_argument("--token", help="The token for authenticating with the Control Panel.")
parser.add_argument("--tenant", help="Tenant ID [Optional]")
parser.add_argument(
"--project-name",
dest="project_name",
help="Project name (AccuKnox entity) - Required for SBOM uploads",
)
parser.add_argument('--softfail', action='store_true', help='Enable soft fail mode for scanning')
parser.add_argument('--skip-upload', action='store_true', help='Skip control plane upload')

Expand All @@ -40,6 +46,8 @@ def execute(self, args):
"accuknox_label": args.label or os.getenv("ACCUKNOX_LABEL"),
"accuknox_token": args.token or os.getenv("ACCUKNOX_TOKEN"),
"accuknox_tenant": args.tenant or os.getenv("ACCUKNOX_TENANT"),
"accuknox_project_name": args.project_name
or os.getenv("ACCUKNOX_PROJECT"),
}

# Get the correct scanner strategy from the registry
Expand All @@ -65,12 +73,47 @@ def execute(self, args):

# Upload results and handle failure
upload_exit_code = 0
if not skip_upload and result_file:
upload_exit_code = upload_results(result_file, accuknox_config["accuknox_endpoint"], accuknox_config["accuknox_label"], accuknox_config["accuknox_token"], accuknox_config["accuknox_tenant"], scanner.data_type_identifier)
elif result_file and os.path.exists(result_file):
os.remove(result_file)
else:
pass # No result file or skip upload, nothing to do with it
if result_file and os.path.exists(result_file):
# Check if this is SBOM mode (container scan with --generate-sbom)
is_sbom_upload = (
args.scantype.lower() == "container"
and getattr(args, "generate_sbom", False)
)

# If this is an SBOM upload, enrich the SBOM file with project_name and classifier
if is_sbom_upload:
project_name = accuknox_config.get("accuknox_project_name")
try:
with open(result_file, "r", encoding="utf-8") as f:
data = json.load(f)

if isinstance(data, dict):
if project_name:
data["project_name"] = project_name
data["project_classifier"] = "container"

with open(result_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
except Exception as e:
Logger.get_logger().debug(
f"Failed to enrich SBOM results.json with project_name/classifier: {e}"
)

# Upload if not skipping
if not skip_upload:
# Determine data_type: SBOM for SBOM uploads, otherwise use scanner's identifier
data_type = "SBOM" if is_sbom_upload else scanner.data_type_identifier
upload_exit_code = upload_results(
result_file,
accuknox_config["accuknox_endpoint"],
accuknox_config["accuknox_label"],
accuknox_config["accuknox_token"],
accuknox_config["accuknox_tenant"],
data_type,
)
else:
# Clean up result file when skipping upload
os.remove(result_file)
handle_failure(exit_code if exit_code != 0 else upload_exit_code, softfail)

except ValidationError as e:
Expand Down
30 changes: 28 additions & 2 deletions aspm_cli/scan/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ class ContainerScanner:
ak_container_image = os.getenv("SCAN_IMAGE", "public.ecr.aws/k9v9d5v2/aquasec/trivy:0.65.0")
result_file = './results.json'

def __init__(self, command, container_mode=False):
def __init__(self, command, container_mode=False, generate_sbom: bool = False):
self.command = command
self.container_mode = container_mode
self.generate_sbom = generate_sbom

def run(self):
try:
Expand All @@ -37,6 +38,12 @@ def run(self):
sanitized_stderr = result.stderr.replace("trivy", "[scanner]")
Logger.get_logger().error(sanitized_stderr)

if self.generate_sbom:
# SBOM mode: Always use self.result_file (forced to ./results.json)
if os.path.exists(self.result_file):
return result.returncode, self.result_file
return result.returncode, None

if not os.path.exists(self.result_file):
return config.SOMETHING_WENT_WRONG_RETURN_CODE, None

Expand All @@ -54,8 +61,26 @@ def _build_container_scan_args(self):
"""
Parses the raw command, strips forbidden arguments, and enforces
the required output format and file. This ensures the class can
reliably find the JSON output.
reliably find the JSON output for vulnerability scans.
"""
if self.generate_sbom:
# SBOM mode: Force cyclonedx format and JSON output
original_args = shlex.split(self.command or "")
# Strip any existing format/output flags
flags_to_strip = {"-o", "--output", "-f", "--format"}
sanitized_args = []
i = 0
while i < len(original_args):
arg = original_args[i]
if arg in flags_to_strip:
i += 2 # Skip flag and value
continue
sanitized_args.append(arg)
i += 1
# Force cyclonedx format and JSON output
sanitized_args.extend(["-f", "cyclonedx", "-o", self.result_file])
return None, sanitized_args

# Flags that take a value and should be removed.
flags_to_strip = {"-s", "--severity", "-o", "--output", "-f", "--format", "--exit-code", "--quiet"}
severity_threshold = None
Expand Down Expand Up @@ -109,3 +134,4 @@ def _severity_threshold_met(self, severity_threshold):
except Exception as e:
Logger.get_logger().error(f"Error reading scan results: {e}")
raise

8 changes: 7 additions & 1 deletion aspm_cli/scanners/container_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ def add_arguments(self, parser: argparse.ArgumentParser):
action="store_true",
help="Run in container mode"
)
parser.add_argument(
"--generate-sbom",
action="store_true",
help="Generate SBOM instead of running a vulnerability scan"
)

def validate_config(self, args: argparse.Namespace, validator: ConfigValidator):
validator.validate_container_scan(args.command, args.container_mode)

def run_scan(self, args: argparse.Namespace) -> tuple[int, str]:
# Instantiate and run the original scanner logic
scanner = OriginalContainerScanner(args.command, args.container_mode)
generate_sbom = getattr(args, "generate_sbom", False)
scanner = OriginalContainerScanner(args.command, args.container_mode, generate_sbom=generate_sbom)
return scanner.run()
21 changes: 19 additions & 2 deletions aspm_cli/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
# Moved from original main.py
ALLOWED_SCAN_TYPES = ["iac", "sq-sast", "secret", "container", "sast", "dast"]

def _build_endpoint_url(endpoint, api_path):
"""
Build the full URL for API requests.
If endpoint already includes protocol (http:// or https://), use it as-is.
Otherwise, prepend https:// for production endpoints.
"""
if endpoint.startswith(("http://", "https://")):
return f"{endpoint}{api_path}"
return f"https://{endpoint}{api_path}"

def clean_env_vars():
"""Removes surrounding quotes from all environment variables."""
for key, value in os.environ.items():
Expand All @@ -34,16 +44,22 @@ def print_banner():
def upload_results(file_path, endpoint, label, token, tenant_id, data_type):
upload_exit_code = 1
"""Uploads scan results to the AccuKnox endpoint."""

if not data_type:
Logger.get_logger().error("data_type is required for artifact uploads")
return upload_exit_code

if not os.path.exists(file_path):
Logger.get_logger().warning(f"Result file not found: {file_path}. Skipping upload.")
return
return upload_exit_code

Logger.get_logger().info(f"Uploading scan results from {file_path} to {endpoint}...")
headers = {
"Authorization": f"Bearer {token}"
}
if tenant_id:
headers["Tenant-Id"] = tenant_id
api_path = "/api/v1/artifact/"
params = {
"data_type": data_type,
"label_id": label
Expand All @@ -56,8 +72,9 @@ def upload_results(file_path, endpoint, label, token, tenant_id, data_type):
spinner.start()

with open(file_path, 'rb') as file:
url = _build_endpoint_url(endpoint, api_path)
response = requests.post(
f"https://{endpoint}/api/v1/artifact/",
url,
headers=headers,
params=params,
files={"file": file},
Expand Down
14 changes: 13 additions & 1 deletion aspm_cli/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ class AccuknoxConfig(BaseModel):
accuknox_label: Optional[str] = Field(None, env="ACCUKNOX_LABEL", description="AccuKnox label for scan results")
accuknox_token: Optional[str] = Field(None, env="ACCUKNOX_TOKEN", description="AccuKnox authentication token")
skip_upload: bool = Field(False)
accuknox_tenant: Optional[int]
accuknox_tenant: Optional[int] = None
accuknox_project_name: Optional[str] = Field(
None,
env="ACCUKNOX_PROJECT_NAME",
description="AccuKnox project name (required for SBOM uploads)",
)

@model_validator(mode='after')
def check_env_vars_and_upload_requirements(self) -> 'AccuknoxConfig':
Expand All @@ -30,6 +35,13 @@ def check_env_vars_and_upload_requirements(self) -> 'AccuknoxConfig':
if self.accuknox_token is None and "ACCUKNOX_TOKEN" in os.environ:
self.accuknox_token = os.environ["ACCUKNOX_TOKEN"]

# Prefer ACCUKNOX_PROJECT_NAME, but fall back to legacy ACCUKNOX_PROJECT for backwards compatibility
if self.accuknox_project_name is None:
if "ACCUKNOX_PROJECT_NAME" in os.environ:
self.accuknox_project_name = os.environ["ACCUKNOX_PROJECT_NAME"]
elif "ACCUKNOX_PROJECT" in os.environ:
self.accuknox_project_name = os.environ["ACCUKNOX_PROJECT"]

# Check required for upload (equivalent to V1 validator for multiple fields)
if not self.skip_upload:
if self.accuknox_endpoint is None:
Expand Down