Skip to content

Commit

Permalink
Merge pull request #11 from oh2fih/workflows
Browse files Browse the repository at this point in the history
Add ShellCheck & Black formatting workflows
  • Loading branch information
adulau authored Apr 15, 2024
2 parents a228c7b + 01621fb commit 6dbf87b
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 54 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/black.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Black formatting
on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
black:
name: Black
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Check Black formatting for Python scripts
uses: psf/black@stable
with:
options: --check --diff --verbose
src: .
22 changes: 22 additions & 0 deletions .github/workflows/shell.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: ShellCheck
on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
shellcheck:
name: ShellCheck
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run ShellCheck for shell scripts
uses: ludeeus/action-shellcheck@master
with:
severity: style
scandir: .
format: gcc
version: stable
72 changes: 36 additions & 36 deletions bin/import.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dynaconf import Dynaconf

# Configuration
settings = Dynaconf(settings_files=['../config/settings.yaml'])
settings = Dynaconf(settings_files=["../config/settings.yaml"])
cpe_path = settings.cpe.path
cpe_source = settings.cpe.source
rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8)
Expand All @@ -33,32 +33,32 @@ def __init__(self):

def startElement(self, tag, attributes):
self.CurrentData = tag
if tag == 'cpe-23:cpe23-item':
self.record['cpe-23'] = attributes['name']
if tag == 'title':
if tag == "cpe-23:cpe23-item":
self.record["cpe-23"] = attributes["name"]
if tag == "title":
self.title_seen = True
if tag == 'reference':
self.refs.append(attributes['href'])
if tag == "reference":
self.refs.append(attributes["href"])

def characters(self, data):
if self.title_seen:
self.title = self.title + data

def endElement(self, tag):
if tag == 'title':
self.record['title'] = self.title
if tag == "title":
self.record["title"] = self.title
self.title = ""
self.title_seen = False
if tag == 'references':
self.record['refs'] = self.refs
if tag == "references":
self.record["refs"] = self.refs
self.refs = []
if tag == 'cpe-item':
to_insert = CPEExtractor(cpe=self.record['cpe-23'])
for word in canonize(to_insert['vendor']):
insert(word=word, cpe=to_insert['cpeline'])
if tag == "cpe-item":
to_insert = CPEExtractor(cpe=self.record["cpe-23"])
for word in canonize(to_insert["vendor"]):
insert(word=word, cpe=to_insert["cpeline"])
self.wordcount += 1
for word in canonize(to_insert['product']):
insert(word=word, cpe=to_insert['cpeline'])
for word in canonize(to_insert["product"]):
insert(word=word, cpe=to_insert["cpeline"])
self.wordcount += 1
self.record = {}
self.itemcount += 1
Expand All @@ -74,18 +74,18 @@ def CPEExtractor(cpe=None):
return False
record = {}
cpefield = cpe.split(":")
record['vendor'] = cpefield[3]
record['product'] = cpefield[4]
record["vendor"] = cpefield[3]
record["product"] = cpefield[4]
cpeline = ""
for cpeentry in cpefield[:5]:
cpeline = f"{cpeline}:{cpeentry}"
record['cpeline'] = cpeline[1:]
record["cpeline"] = cpeline[1:]
return record


def canonize(value=None):
value = value.lower()
words = value.split('_')
words = value.split("_")
return words


Expand All @@ -97,30 +97,30 @@ def insert(word=None, cpe=None):
rdb.zadd("rank:cpe", {cpe: 1}, incr=True)


if __name__ == '__main__':
if __name__ == "__main__":
argparser = argparse.ArgumentParser(
description='Initializes the Redis database with CPE dictionary.'
description="Initializes the Redis database with CPE dictionary."
)
argparser.add_argument(
'--download',
'-d',
action='count',
"--download",
"-d",
action="count",
default=0,
help='Download the CPE dictionary even if it already exists.',
help="Download the CPE dictionary even if it already exists.",
)
argparser.add_argument(
'--replace',
'-r',
action='count',
"--replace",
"-r",
action="count",
default=0,
help='Flush and repopulated the CPE database.',
help="Flush and repopulated the CPE database.",
)
argparser.add_argument(
'--update',
'-u',
action='store_true',
"--update",
"-u",
action="store_true",
default=False,
help='Update the CPE database without flushing',
help="Update the CPE database without flushing",
)
args = argparser.parse_args()

Expand All @@ -144,8 +144,8 @@ def insert(word=None, cpe=None):

print(f"Uncompressing {cpe_path}.gz ...")
try:
with gzip.open(f"{cpe_path}.gz", 'rb') as cpe_gz:
with open(cpe_path, 'wb') as cpe_xml:
with gzip.open(f"{cpe_path}.gz", "rb") as cpe_gz:
with open(cpe_path, "wb") as cpe_xml:
shutil.copyfileobj(cpe_gz, cpe_xml)
os.remove(f"{cpe_path}.gz")
except (FileNotFoundError, PermissionError) as e:
Expand Down
12 changes: 6 additions & 6 deletions bin/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
sys.path.append(os.path.join(runPath, ".."))
from lib.cpeguesser import CPEGuesser

if __name__ == '__main__':
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Find potential CPE names from a list of keyword(s) and return a JSON of the results'
description="Find potential CPE names from a list of keyword(s) and return a JSON of the results"
)
parser.add_argument(
'word',
metavar='WORD',
"word",
metavar="WORD",
type=str,
nargs='+',
help='One or more keyword(s) to lookup',
nargs="+",
help="One or more keyword(s) to lookup",
)
args = parser.parse_args()

Expand Down
14 changes: 7 additions & 7 deletions bin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dynaconf import Dynaconf

# Configuration
settings = Dynaconf(settings_files=['../config/settings.yaml'])
settings = Dynaconf(settings_files=["../config/settings.yaml"])
port = settings.server.port

runPath = os.path.dirname(os.path.realpath(__file__))
Expand All @@ -20,31 +20,31 @@
class Search:
def on_post(self, req, resp):
data_post = req.bounded_stream.read()
js = data_post.decode('utf-8')
js = data_post.decode("utf-8")
try:
q = json.loads(js)
except ValueError:
resp.status = falcon.HTTP_400
resp.media = "Missing query array or incorrect JSON format"
return

if 'query' in q:
if "query" in q:
pass
else:
resp.status = falcon.HTTP_400
resp.media = "Missing query array or incorrect JSON format"
return

cpeGuesser = CPEGuesser()
resp.media = cpeGuesser.guessCpe(q['query'])
resp.media = cpeGuesser.guessCpe(q["query"])


if __name__ == '__main__':
if __name__ == "__main__":
app = falcon.App()
app.add_route('/search', Search())
app.add_route("/search", Search())

try:
with make_server('', port, app) as httpd:
with make_server("", port, app) as httpd:
print(f"Serving on port {port}...")
httpd.serve_forever()
except OSError as e:
Expand Down
14 changes: 9 additions & 5 deletions lib/cpeguesser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
from dynaconf import Dynaconf

# Configuration
settings = Dynaconf(
settings_files=['../config/settings.yaml']
)
settings = Dynaconf(settings_files=["../config/settings.yaml"])


class CPEGuesser:
def __init__(self):
self.rdb = redis.Redis(host=settings.redis.host, port=settings.redis.port, db=8, decode_responses=True)
self.rdb = redis.Redis(
host=settings.redis.host,
port=settings.redis.port,
db=8,
decode_responses=True,
)

def guessCpe(self, words):
k = []
Expand All @@ -28,7 +32,7 @@ def guessCpe(self, words):
ranked = []

for cpe in result:
rank = self.rdb.zrank('rank:cpe', cpe)
rank = self.rdb.zrank("rank:cpe", cpe)
ranked.append((rank, cpe))

return sorted(ranked)

0 comments on commit 6dbf87b

Please sign in to comment.