Skip to content

Commit

Permalink
Merge pull request #175 from censys/adh/fix-cli-colors
Browse files Browse the repository at this point in the history
Fix CLI Colors
  • Loading branch information
thehappydinoa authored Oct 7, 2021
2 parents f798ffd + 4f14897 commit 0a37f96
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 97 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: Python CI

on:
push:
branches:
- main
paths-ignore:
- "**.rst"
- "**.md"
Expand Down
18 changes: 12 additions & 6 deletions censys/cli/commands/account.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import argparse
import sys

from rich import box, print, print_json
from rich import box
from rich.table import Table

from censys.cli.utils import console
from censys.common.exceptions import CensysUnauthorizedException
from censys.search.v2.api import CensysSearchAPIv2

Expand All @@ -19,20 +20,25 @@ def cli_account(args: argparse.Namespace): # pragma: no cover
client = CensysSearchAPIv2(args.api_id, args.api_secret)
account = client.account()
if args.json:
print_json(data=account)
console.print_json(data=account)
else:
table = Table("Key", "Value", show_header=False, box=box.SQUARE)
table = Table(
"Key", "Value", show_header=False, box=box.SQUARE, highlight=True
)
table.add_row("Email", account["email"])
table.add_row("Login ID", account["login"])
table.add_row("First Login", account["first_login"])
table.add_row("Last Login", account["last_login"][:-7])
quota = account["quota"]
table.add_row("Query Quota", f"{quota['used']} / {quota['allowance']}")
table.add_row(
"Query Quota",
f"{quota['used']} / {quota['allowance']} ({quota['used']/quota['allowance']:.2f}%)",
)
table.add_row("Quota Resets At", quota["resets_at"])
print(table)
console.print(table)
sys.exit(0)
except CensysUnauthorizedException:
print("Failed to authenticate")
console.print("Failed to authenticate")
sys.exit(1)


Expand Down
30 changes: 18 additions & 12 deletions censys/cli/commands/asm.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import json
import sys

from rich.prompt import Prompt
from rich.prompt import Confirm, Prompt

from censys.asm.seeds import SEED_TYPES, Seeds
from censys.cli.utils import console
from censys.common.config import DEFAULT, get_config, write_config
from censys.common.exceptions import CensysUnauthorizedException

Expand All @@ -26,21 +27,26 @@ def cli_asm_config(_: argparse.Namespace): # pragma: no cover
redacted_api_key = api_key.replace(api_key[:key_len], key_len * "*")
api_key_prompt = f"{api_key_prompt} [cyan]({redacted_api_key})[/cyan]"

api_key = Prompt.ask(api_key_prompt) or api_key
api_key = Prompt.ask(api_key_prompt, console=console) or api_key

if not api_key:
print("Please enter valid credentials")
console.print("Please enter valid credentials")
sys.exit(1)

color = Confirm.ask(
"Do you want color output?", default=True, show_default=False, console=console
)
config.set(DEFAULT, "color", "auto" if color else "")

try:
# Assumes that login was successfully
config.set(DEFAULT, "asm_api_key", api_key)

write_config(config)
print("\nSuccessfully configured credentials")
console.print("\nSuccessfully configured credentials")
sys.exit(0)
except CensysUnauthorizedException:
print("Failed to authenticate")
console.print("Failed to authenticate")
sys.exit(1)


Expand All @@ -61,7 +67,7 @@ def cli_add_seeds(args: argparse.Namespace):
try:
seeds = json.loads(data)
except json.decoder.JSONDecodeError as e:
print(f"Invalid json {e}")
console.print(f"Invalid json {e}")
sys.exit(1)

seeds_to_add = []
Expand All @@ -72,7 +78,7 @@ def cli_add_seeds(args: argparse.Namespace):
elif isinstance(seed, str):
seed = {"value": seed, "type": args.default_type}
else:
print(f"Invalid seed {seed}")
console.print(f"Invalid seed {seed}")
sys.exit(1)
if "label" not in seed:
seed["label"] = args.label_all
Expand All @@ -84,20 +90,20 @@ def cli_add_seeds(args: argparse.Namespace):
added_seeds = res["addedSeeds"]
added_count = len(added_seeds)
if not added_count:
print("No seeds were added. (Run with -v to get more info)")
console.print("No seeds were added. (Run with -v to get more info)")
if not args.verbose:
sys.exit(1)
else:
print(f"Added {added_count} seeds.")
console.print(f"Added {added_count} seeds.")
if added_count < to_add_count:
print(f"Seeds not added: {to_add_count - added_count}")
console.print(f"Seeds not added: {to_add_count - added_count}")
if args.verbose: # pragma: no cover
print(
console.print(
"The following seed(s) were not able to be added as they already exist or are reserved."
)
for seed in seeds_to_add:
if not any([s for s in added_seeds if seed["value"] == s["value"]]):
print(json.dumps(seed, indent=4))
console.print_json(seed)


def include(parent_parser: argparse._SubParsersAction, parents: dict):
Expand Down
20 changes: 13 additions & 7 deletions censys/cli/commands/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import os
import sys

from rich.prompt import Prompt
from rich.prompt import Confirm, Prompt

from censys.cli.utils import console
from censys.common.config import DEFAULT, get_config, write_config
from censys.common.exceptions import CensysUnauthorizedException
from censys.search.v2.api import CensysSearchAPIv2
Expand All @@ -27,7 +28,7 @@ def cli_config(_: argparse.Namespace): # pragma: no cover
api_secret_env = os.getenv("CENSYS_API_SECRET")

if api_id_env is not None or api_secret_env is not None:
print(
console.print(
"Please note environment variables (CENSYS_API_ID & CENSYS_API_SECRET) "
"will take priority over configured credentials."
)
Expand All @@ -40,16 +41,21 @@ def cli_config(_: argparse.Namespace): # pragma: no cover
api_id_prompt = f"{api_id_prompt} [cyan]({redacted_id})[/cyan]"
api_secret_prompt = f"{api_secret_prompt} [cyan]({redacted_secret})[/cyan]"

api_id = Prompt.ask(api_id_prompt) or api_id
api_secret = Prompt.ask(api_secret_prompt) or api_secret
api_id = Prompt.ask(api_id_prompt, console=console) or api_id
api_secret = Prompt.ask(api_secret_prompt, console=console) or api_secret

if not (api_id and api_secret):
print("Please enter valid credentials")
console.print("Please enter valid credentials")
sys.exit(1)

api_id = api_id.strip()
api_secret = api_secret.strip()

color = Confirm.ask(
"Do you want color output?", default=True, show_default=False, console=console
)
config.set(DEFAULT, "color", "auto" if color else "")

try:
client = CensysSearchAPIv2(api_id, api_secret)
account = client.account()
Expand All @@ -60,10 +66,10 @@ def cli_config(_: argparse.Namespace): # pragma: no cover
config.set(DEFAULT, "api_secret", api_secret)

write_config(config)
print(f"\nSuccessfully authenticated for {email}")
console.print(f"\nSuccessfully authenticated for {email}")
sys.exit(0)
except CensysUnauthorizedException:
print("Failed to authenticate")
console.print("Failed to authenticate")
sys.exit(1)


Expand Down
97 changes: 54 additions & 43 deletions censys/cli/commands/hnri.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
import argparse
import sys
import webbrowser
from typing import List, Optional, Tuple
from typing import Any, List, Optional, Tuple

import requests
from rich import print
from rich import box
from rich.table import Table

from censys.cli.utils import console
from censys.common.exceptions import CensysCLIException, CensysNotFoundException
from censys.search import CensysHosts

Expand Down Expand Up @@ -50,76 +52,87 @@ def translate_risk(self, services: List[dict]) -> Tuple[List[dict], List[dict]]:
medium_risk = []

for service in services:
port = service.get("port")
protocol = service.get("service_name")
string = f"{protocol} on {port}"
if protocol in self.HIGH_RISK_DEFINITION:
high_risk.append({"port": port, "protocol": protocol, "string": string})
elif protocol in self.MEDIUM_RISK_DEFINITION:
medium_risk.append(
{"port": port, "protocol": protocol, "string": string}
)
service_name = service.get("service_name")
if service_name in self.HIGH_RISK_DEFINITION:
high_risk.append(service)
elif service_name in self.MEDIUM_RISK_DEFINITION:
medium_risk.append(service)
else:
medium_risk.append(
{"port": port, "protocol": protocol, "string": string}
)
medium_risk.append(service)

return high_risk, medium_risk

@staticmethod
def risks_to_string(high_risk: list, medium_risk: list) -> str:
def make_risks_into_table(self, title: str, risks: List[dict]) -> Table:
"""Creates a table of risks.
Args:
title (str): Title of the table.
risks (list): List of risks.
Returns:
Table: Table of risks.
"""
table = Table("Port", "Service Name", title=title, box=box.SQUARE)
for risk in risks:
table.add_row(str(risk.get("port")), risk.get("service_name"))
return table

def risks_to_string(self, high_risks: list, medium_risks: list) -> List[Any]:
"""Risks to printable string.
Args:
high_risk (list): Lists of high risks.
medium_risk (list): Lists of medium risks.
high_risks (list): Lists of high risks.
medium_risks (list): Lists of medium risks.
Raises:
CensysCLIException: No information/risks found.
Returns:
str: Printable string for CLI.
list: Printable objects for CLI.
"""
len_high_risk = len(high_risk)
len_medium_risk = len(medium_risk)
len_high_risk = len(high_risks)
len_medium_risk = len(medium_risks)

if len_high_risk + len_medium_risk == 0:
raise CensysCLIException

response = ""
response: List[Any] = []
if len_high_risk > 0:
response = (
response
+ "[bold red]:exclamation: High Risks Found:[/bold red] \n"
+ "\n".join([risk.get("string") for risk in high_risk])
response.append(
self.make_risks_into_table(
":exclamation: High Risks Found",
high_risks,
)
)
else:
response = response + "You don't have any High Risks in your network\n"
response.append("You don't have any High Risks in your network\n")
if len_medium_risk > 0:
response = (
response
+ "[bold orange]:grey_exclamation: Medium Risks Found:[/bold orange] \n"
+ "\n".join([risk.get("string") for risk in medium_risk])
response.append(
self.make_risks_into_table(
":grey_exclamation: Medium Risks Found",
medium_risks,
)
)
else:
response = response + "You don't have any Medium Risks in your network\n"
response.append("You don't have any Medium Risks in your network\n")
return response

def view_current_ip_risks(self) -> str:
"""Gets protocol information for the current IP and returns any risks.
Returns:
str: Printable
"""
def view_current_ip_risks(self):
"""Gets protocol information for the current IP and returns any risks."""
current_ip = self.get_current_ip()

try:
console.print(f"Searching for information on {current_ip}...")
results = self.index.view(current_ip)
services = results.get("services", [])
high_risk, medium_risk = self.translate_risk(services)
return self.risks_to_string(high_risk, medium_risk)
for res in self.risks_to_string(high_risk, medium_risk):
console.print(res)
console.print(
f"\nFor more information, please visit: https://search.censys.io/hosts/{current_ip}"
)
except (CensysNotFoundException, CensysCLIException):
return (
console.print(
"[green]:white_check_mark: No Risks were found on your network[/green]"
)

Expand All @@ -136,9 +149,7 @@ def cli_hnri(args: argparse.Namespace):

client = CensysHNRI(args.api_id, args.api_secret)

risks = client.view_current_ip_risks()

print(risks)
client.view_current_ip_risks()


def include(parent_parser: argparse._SubParsersAction, parents: dict):
Expand Down
4 changes: 2 additions & 2 deletions censys/cli/commands/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import List
from urllib.parse import urlencode

from ..utils import INDEXES, V1_INDEXES, V2_INDEXES, console, write_file
from censys.cli.utils import INDEXES, V1_INDEXES, V2_INDEXES, console, write_file
from censys.common.exceptions import CensysCLIException
from censys.search import SearchClient

Expand Down Expand Up @@ -142,7 +142,7 @@ def cli_search(args: argparse.Namespace):
try:
write_file(results, **write_args)
except ValueError as error: # pragma: no cover
print(f"Error writing log file. Error: {error}")
console.print(f"Error writing log file. Error: {error}")


def include(parent_parser: argparse._SubParsersAction, parents: dict):
Expand Down
7 changes: 4 additions & 3 deletions censys/cli/commands/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import sys
import webbrowser

from ..utils import V2_INDEXES, valid_datetime_type, write_file
from censys.cli.utils import V2_INDEXES, console, valid_datetime_type, write_file
from censys.search import SearchClient
from censys.search.v2.api import CensysSearchAPIv2


def cli_view(args: argparse.Namespace):
Expand All @@ -29,7 +30,7 @@ def cli_view(args: argparse.Namespace):

c = SearchClient(**censys_args)

index = getattr(c.v2, args.index_type)
index: CensysSearchAPIv2 = getattr(c.v2, args.index_type)

view_args = {}
write_args = {
Expand All @@ -46,7 +47,7 @@ def cli_view(args: argparse.Namespace):
try:
write_file(document, **write_args)
except ValueError as error: # pragma: no cover
print(f"Error writing log file. Error: {error}")
console.print(f"Error writing log file. Error: {error}")


def include(parent_parser: argparse._SubParsersAction, parents: dict):
Expand Down
Loading

0 comments on commit 0a37f96

Please sign in to comment.