Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@ venv/*
.env.local

.gitsigners

*.key
10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ dependencies = [
"aiodns==3.1.1",
"aiohttp==3.11.15",
"aleph-message>=1.0.5",
"aleph-sdk-python @ git+https://github.com/aleph-im/aleph-sdk-python@1yam-better-filter-crn",
"base58==2.1.1", # Needed now as default with _load_account changement
"aleph-sdk-python==2.3",
"base58==2.1.1", # Needed now as default with _load_account changement
"click<8.2",
"ledgerblue>=0.1.48",
"ledgereth>=0.10",
"py-sr25519-bindings==0.2", # Needed for DOT signatures
"py-sr25519-bindings==0.2", # Needed for DOT signatures
"pydantic>=2",
"pygments==2.19.1",
"pynacl==1.5", # Needed now as default with _load_account changement
"pynacl==1.5", # Needed now as default with _load_account changement
"python-magic==0.4.27",
"rich==13.9.*",
"setuptools>=65.5",
"substrate-interface==1.7.11", # Needed for DOT signatures
"substrate-interface==1.7.11", # Needed for DOT signatures
"textual==0.73",
"typer==0.15.2",
]
Expand Down
2 changes: 2 additions & 0 deletions src/aleph_client/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
about,
account,
aggregate,
authorizations,
credit,
domain,
files,
Expand All @@ -26,6 +27,7 @@
help="Manage messages (post, amend, watch and forget) on Aleph Cloud",
)
app.add_typer(aggregate.app, name="aggregate", help="Manage aggregate messages and permissions on Aleph Cloud")
app.add_typer(authorizations.app, name="authorizations", help="Manage authorizations")
app.add_typer(files.app, name="file", help="Manage files (upload and pin on IPFS) on Aleph Cloud")
app.add_typer(program.app, name="program", help="Manage programs (micro-VMs) on Aleph Cloud")
app.add_typer(instance.app, name="instance", help="Manage instances (VMs) on Aleph Cloud")
Expand Down
223 changes: 1 addition & 222 deletions src/aleph_client/commands/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from aleph.sdk.client import AlephHttpClient, AuthenticatedAlephHttpClient
from aleph.sdk.conf import settings
from aleph.sdk.utils import extended_json_encoder
from aleph_message.models import Chain, MessageType
from aleph_message.models import Chain
from aleph_message.status import MessageStatus
from rich.console import Console
from rich.panel import Panel
Expand Down Expand Up @@ -285,224 +285,3 @@ async def list_aggregates(
)

return aggregates


@app.command()
async def authorize(
address: Annotated[str, typer.Argument(help=help_strings.TARGET_ADDRESS)],
chain: Annotated[Optional[Chain], typer.Option(help="Only on specified chain")] = None,
types: Annotated[
Optional[str], typer.Option(help="Only for specified message types (comma separated list)")
] = None,
channels: Annotated[Optional[str], typer.Option(help="Only on specified channels (comma separated list)")] = None,
post_types: Annotated[
Optional[str], typer.Option(help="Only for specified post types (comma separated list)")
] = None,
aggregate_keys: Annotated[
Optional[str], typer.Option(help="Only for specified aggregate keys (comma separated list)")
] = None,
private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING,
private_key_file: Annotated[
Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE)
] = settings.PRIVATE_KEY_FILE,
print_message: bool = False,
verbose: bool = True,
debug: bool = False,
):
"""Grant specific publishing permissions to an address to act on behalf of this account"""

setup_logging(debug)

account: AccountTypes = load_account(private_key_str=private_key, private_key_file=private_key_file, chain=chain)

data = await get(
key="security",
subkeys="authorizations",
address=account.get_address(),
private_key=private_key,
private_key_file=private_key_file,
verbose=False,
debug=debug,
)

authorizations = None
if data:
authorizations = data.get("authorizations")
new_auth: dict = {"address": address}
if chain:
new_auth["chain"] = chain.value
if types:
valid_types = []
for t in types.split(","):
try:
valid_types.append(MessageType(t.upper()).value)
except ValueError as e:
logger.error(
f"Invalid value passed into `--types`: {t}\n"
f"Valid values: {', '.join([e.value for e in MessageType])}"
)
raise typer.Exit(1) from e
new_auth["types"] = valid_types
if channels:
new_auth["channels"] = channels.split(",")
if post_types:
new_auth["post_types"] = post_types.split(",")
if aggregate_keys:
new_auth["aggregate_keys"] = aggregate_keys.split(",")
authorizations.append(new_auth)
if authorizations:
success = await post(
key="security",
subkey="authorizations",
content=dumps(authorizations),
address=None,
channel=settings.DEFAULT_CHANNEL,
inline=True,
sync=True,
private_key=private_key,
private_key_file=private_key_file,
print_message=print_message,
verbose=False,
debug=debug,
)
if verbose:
if success:
typer.echo(f"Permissions has been added for {address}")
else:
typer.echo(f"Failed to add permissions for {address}")


@app.command()
async def revoke(
address: Annotated[str, typer.Argument(help=help_strings.TARGET_ADDRESS)],
private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING,
private_key_file: Annotated[
Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE)
] = settings.PRIVATE_KEY_FILE,
chain: Annotated[Optional[Chain], typer.Option(help=help_strings.ADDRESS_CHAIN)] = None,
print_message: bool = False,
verbose: bool = True,
debug: bool = False,
):
"""Revoke all publishing permissions from an address acting on behalf of this account"""

setup_logging(debug)

account: AccountTypes = load_account(private_key_file=private_key, private_key_str=private_key_file, chain=chain)

data = await get(
key="security",
subkeys="authorizations",
address=account.get_address(),
private_key=private_key,
private_key_file=private_key_file,
verbose=False,
debug=debug,
)

authorizations = None
if data:
old_authorizations = data.get("authorizations")
authorizations = [item for item in old_authorizations if item.get("address", "") != address]
if old_authorizations != authorizations:
success = await post(
key="security",
subkey="authorizations",
content=dumps(authorizations),
address=None,
channel=settings.DEFAULT_CHANNEL,
inline=True,
sync=True,
private_key=private_key,
private_key_file=private_key_file,
print_message=print_message,
verbose=False,
debug=debug,
)
if verbose:
if success:
typer.echo(f"Permissions has been deleted for {address}")
else:
typer.echo(f"Failed to delete permissions for {address}")
elif verbose:
typer.echo(f"No permission found for {address}. Ignored")


@app.command()
async def permissions(
address: Annotated[Optional[str], typer.Option(help=help_strings.TARGET_ADDRESS)] = None,
private_key: Annotated[Optional[str], typer.Option(help=help_strings.PRIVATE_KEY)] = settings.PRIVATE_KEY_STRING,
private_key_file: Annotated[
Optional[Path], typer.Option(help=help_strings.PRIVATE_KEY_FILE)
] = settings.PRIVATE_KEY_FILE,
chain: Annotated[Optional[Chain], typer.Option(help=help_strings.ADDRESS_CHAIN)] = None,
json: Annotated[bool, typer.Option(help="Print as json instead of rich table")] = False,
verbose: bool = True,
debug: bool = False,
) -> Optional[dict]:
"""Display all permissions emitted by an account"""

setup_logging(debug)

_, address = get_account_and_address(
private_key=private_key, private_key_file=private_key_file, chain=chain, address=address
)

data = await get(
key="security",
subkeys="authorizations",
address=address,
private_key=private_key,
private_key_file=private_key_file,
verbose=False,
debug=debug,
)

authorizations = None
if data:
authorizations = data.get("authorizations")
if authorizations:
if json:
typer.echo(dumps(authorizations, indent=4, default=extended_json_encoder))
elif verbose:
infos = [
Text.from_markup(f"Address: [bright_cyan]{address}[/bright_cyan]\n\nAuthorizations by address:"),
]
auth_addresses: dict = {}
for auth in authorizations:
addr = auth["address"]
auth_address = auth_addresses[addr] = auth_addresses.get(addr, [])
keys = ["chain", "channels", "types", "post_types", "aggregate_keys"]
item = {key: auth.get(key) for key in keys if auth.get(key) is not None}
auth_address.append(item)
for addr, allowances in auth_addresses.items():
infos.append(
Text.from_markup(f"\n↳ [orange1]{addr}[/orange1]"),
)
for item in allowances:
display_item = "[green]all[/green]"
if item:
display_item = ", ".join(
[
"[orchid]{key}([white]"
f"{value if isinstance(value, list) else ', '.join(value)}"
"[/white])[/orchid]"
for key, value in item.items()
]
)
infos.append(Text.from_markup(f"\n• {display_item}"))

console = Console()
console.print(
Panel(
Text.assemble(*infos),
title="Permissions",
border_style="bright_cyan",
expand=False,
title_align="left",
)
)
if not authorizations and verbose:
typer.echo(f"Address: {address}\n\nNo permission data found\n")

return authorizations
Loading