Skip to content

Commit

Permalink
feat: "language" parser for rpm files
Browse files Browse the repository at this point in the history
* fixes intel#2916

Signed-off-by: Bartlomiej Cieszkowski <[email protected]>
Signed-off-by: Przemyslaw Romaniak <[email protected]>
  • Loading branch information
bcieszko committed May 1, 2023
1 parent 95a48fd commit 061a8bb
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 0 deletions.
1 change: 1 addition & 0 deletions cve_bin_tool/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"swift",
"php",
"perl",
"rpm"
]


Expand Down
2 changes: 2 additions & 0 deletions cve_bin_tool/parsers/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cve_bin_tool.parsers.php import PhpParser
from cve_bin_tool.parsers.python import PythonParser, PythonRequirementsParser
from cve_bin_tool.parsers.r import RParser
from cve_bin_tool.parsers.rpm import RpmParser
from cve_bin_tool.parsers.ruby import RubyParser
from cve_bin_tool.parsers.rust import RustParser
from cve_bin_tool.parsers.swift import SwiftParser
Expand All @@ -25,6 +26,7 @@
"Package.resolved": SwiftParser,
"composer.lock": PhpParser,
"cpanfile": PerlParser,
".rpm:": RpmParser,
}


Expand Down
230 changes: 230 additions & 0 deletions cve_bin_tool/parsers/rpm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
# Copyright (C) 2023 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later

import io
from enum import IntEnum

from cve_bin_tool.parsers import Parser


class RpmParser(Parser):
# more details about rpm structure can be found here:
# https://rpm-software-management.github.io/rpm/manual/format.html
class Type(IntEnum):
NULL = 0
CHAR = 1
INT8 = 2
INT16 = 3
INT32 = 4
INT64 = 5
STRING = 6
BIN = 7
STRING_ARRAY = 8
I18NSTRING_TYPE = 9

class Tag(IntEnum):
RPMTAG_NAME = 1000
RPMTAG_VERSION = 1001

TAGS_TO_PARSE = [Tag.RPMTAG_NAME, Tag.RPMTAG_VERSION]

RPM_LEAD_MAGIC = b"\xed\xab\xee\xdb"
RPM_HEADER_MAGIC = b"\x8e\xad\xe8"
RPM_LEAD_LEN = 96
RPM_LEAD_NAME_OFFSET = 10
RPM_LEAD_NAME_LEN = 66
RPM_HEADER_LEN = 16
RPM_HEADER_INDEX_LEN = 16

def __init__(self, cve_db, logger, validate=True):
super().__init__(cve_db, logger)
self.validate = validate

def validate_rpm(self, filename):
with open(filename, "rb") as rpm:
rpm_lead_magic = rpm.read(len(self.RPM_LEAD_MAGIC))
if self.RPM_LEAD_MAGIC == rpm_lead_magic:
return True
return False

def get_rpm_entry(self, rpm, rpm_size, base_offset, entry_type, offset, count):
if rpm_size < (base_offset + offset + count):
self.logger.error(f"{self.filename} - entry corrupted")
return None
rpm.seek(base_offset + offset)
data = b""
rpm_entry = None
if entry_type == self.Type.STRING:
# string can only have count 1
char = rpm.read(1)
while char != b"\x00":
data += char
char = rpm.read(1)
try:
rpm_entry = data.rstrip(b"\x00").decode("ascii")
except UnicodeError:
self.logger.error(
f"{self.filename} - {data} - invalid string in rpm with nonascii characters at offset 0x{base_offset+offset:X}"
)
else:
# unsupported - if more info is needed feel free to add parsing here
# at the moment all the data that is extracted is string
pass
return rpm_entry

def extract_info(self):
# File structure is as follows:
# Lead
# Signature
# Header
# Payload

with open(self.filename, "rb") as rpm:
rpm.seek(0, io.SEEK_END)
rpm_size = rpm.tell()
rpm.seek(0)

# Lead
rpm_lead = rpm.read(self.RPM_LEAD_LEN)
if len(rpm_lead) != self.RPM_LEAD_LEN:
# file corrupted
self.logger.error(
f"{self.filename} - file is too short, possibly corrupted"
)
return None
name_bytes = rpm_lead[
self.RPM_LEAD_NAME_OFFSET : self.RPM_LEAD_NAME_LEN + 1
]
try:
self.name = name_bytes.rstrip(b"\x00").decode("ascii")
except UnicodeError:
self.logger.error(
f"{self.filename} - invalid name in rpm with nonascii characters"
)
return None

self.logger.debug(f"{self.filename} - RPM Lead OK")
self.logger.debug(f"{self.filename} - {self.name}")

# Signature / Header
# 3 bytes magic
# 1 byte version
# 4 bytes reserved
# 4 bytes number of index entries
# 4 bytes data size
# n i* 16 index entries

# Signature and header have the same structure
header = rpm.read(self.RPM_HEADER_LEN)
if len(header) != self.RPM_HEADER_LEN:
self.logger.error(
f"{self.filename} - file is too short, possibly corrupted"
)
return None

if header[0:3] != self.RPM_HEADER_MAGIC:
self.logger.error(f"{self.filename} - corrupted RPM signature header")
return None

entries = int.from_bytes(header[8:12], byteorder="big")
data_size = int.from_bytes(header[12:16], byteorder="big")
self.logger.debug(f"signature index entries: {entries}")

# skip signature indexes and data
target_offset = rpm.tell() + (
entries * self.RPM_HEADER_INDEX_LEN + data_size
)
# Header is aligned to 8-byte boundary
if target_offset % 8:
target_offset = target_offset - (target_offset % 8) + 8

if target_offset > rpm_size:
self.logger.error(f"{self.filename} - corrupted RPM")
return None

rpm.seek(target_offset)

# Header
header = rpm.read(self.RPM_HEADER_LEN)
if len(header) != self.RPM_HEADER_LEN:
self.logger.error(
f"{self.filename} - file is too short, possibly corrupted"
)
return None

if header[0:3] != self.RPM_HEADER_MAGIC:
self.logger.error(f"{self.filename} - corrupted RPM header - {header}")
return None

entries = int.from_bytes(header[8:12], byteorder="big")
data_size = int.from_bytes(header[12:16], byteorder="big")
self.logger.debug(f"header index entries: {entries}")

header_entries_offset = rpm.tell()
target_offset = rpm.tell() + (
entries * self.RPM_HEADER_INDEX_LEN + data_size
)
# Header is aligned to 8-byte boundary
if target_offset % 8:
target_offset = target_offset - (target_offset % 8) + 8

if target_offset > rpm_size:
self.logger.error(f"{self.filename} - corrupted RPM")
return None

# Index Entry
# 4 bytes Tag
# 4 bytes Type
# 4 bytes Offset
# 4 bytes Count
# Parse through index entries
data_offset = header_entries_offset + (entries * self.RPM_HEADER_INDEX_LEN)
rpm_info = {}
entries_tags = self.TAGS_TO_PARSE.copy()
for i in range(0, entries):
entry_raw = rpm.read(self.RPM_HEADER_INDEX_LEN)
entry_tag = int.from_bytes(entry_raw[0:4], byteorder="big")
entry_type = self.Type(int.from_bytes(entry_raw[4:8], byteorder="big"))
entry_offset = int.from_bytes(entry_raw[8:12], byteorder="big")
entry_count = int.from_bytes(entry_raw[12:16], byteorder="big")

if entry_tag in entries_tags:
entries_tags.remove(entry_tag)
restore_offset = rpm.tell()
rpm_entry = self.get_rpm_entry(
rpm,
rpm_size,
data_offset,
entry_type,
entry_offset,
entry_count,
)
rpm.seek(restore_offset)
self.logger.debug(
f"{entry_tag} - {entry_type} - {entry_offset} - {entry_count} - data: {rpm_entry}"
)
rpm_info[entry_tag] = rpm_entry
if not entries_tags:
# we got all the info we need
break

self.logger.debug(f"{rpm_info}")
return rpm_info

def run_checker(self, filename):
"""Process RPM file and extract product"""
self.filename = filename
continue_processing = True
if self.validate:
continue_processing = self.validate_rpm(self.filename)
self.logger.debug(f"Validation of {filename} - {continue_processing}")
if continue_processing:
rpm_info = self.extract_info()
if rpm_info:
product_info = self.find_vendor(
rpm_info.get(self.Tag.RPMTAG_NAME),
rpm_info.get(self.Tag.RPMTAG_VERSION),
)
if product_info is not None:
yield from product_info
self.logger.debug(f"Done scanning file: {filename}")

0 comments on commit 061a8bb

Please sign in to comment.