Skip to content

Reckless direct install #8241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 74 additions & 26 deletions tools/reckless
Original file line number Diff line number Diff line change
Expand Up @@ -1175,10 +1175,11 @@ def _install_plugin(src: InstInfo) -> Union[InstInfo, None]:
log.debug(f'{clone_path} already exists - deleting')
shutil.rmtree(clone_path)
if src.srctype == Source.DIRECTORY:
full_source_path = Path(src.source_loc) / src.subdir
log.debug(("copying local directory contents from"
f" {src.source_loc}"))
f" {full_source_path}"))
create_dir(clone_path)
shutil.copytree(src.source_loc, plugin_path)
shutil.copytree(full_source_path, plugin_path)
elif src.srctype in [Source.LOCAL_REPO, Source.GITHUB_REPO,
Source.OTHER_URL, Source.GIT_LOCAL_CLONE]:
# clone git repository to /tmp/reckless-...
Expand Down Expand Up @@ -1297,6 +1298,36 @@ def _install_plugin(src: InstInfo) -> Union[InstInfo, None]:
return staged_src


def location_from_name(plugin_name: str) -> (str, str):
"""Maybe the location was passed in place of the plugin name. Check
if this looks like a filepath or URL and return that as well as the
plugin name."""
if not Path(plugin_name).exists():
try:
parsed = urlparse(path)
if parsed.scheme in ['http', 'https']:
return (plugin_name, Path(plugin_name).with_suffix('').name)
except ValueError:
pass
# No path included, return the name only.
return (None, plugin_name)

# Directory containing the plugin? The plugin name should match the dir.
if os.path.isdir(plugin_name):
return (Path(plugin_name).parent, Path(plugin_name).name)

# Possibly the entrypoint itself was passed?
elif os.path.isfile(plugin_name):
if Path(plugin_name).with_suffix('').name != Path(plugin_name).parent.name or \
not Path(plugin_name).parent.parent.exists():
# If the directory is not named for the plugin, we can't infer what
# should be done.
# FIXME: return InstInfo with entrypoint rather than source str.
return (None, plugin_name)
# We have to make inferences as to the naming here.
return (Path(plugin_name).parent.parent, Path(plugin_name).with_suffix('').name)


def install(plugin_name: str) -> Union[str, None]:
"""Downloads plugin from source repos, installs and activates plugin.
Returns the location of the installed plugin or "None" in the case of
Expand All @@ -1309,33 +1340,48 @@ def install(plugin_name: str) -> Union[str, None]:
else:
name = plugin_name
commit = None
log.debug(f"Searching for {name}")
if search(name):
global LAST_FOUND
src = LAST_FOUND
src.commit = commit
log.debug(f'Retrieving {src.name} from {src.source_loc}')
try:
installed = _install_plugin(src)
except FileExistsError as err:
log.error(f'File exists: {err.filename}')
return None
LAST_FOUND = None
if not installed:
log.warning(f'{plugin_name}: installation aborted')
# Is the install request specifying a path to the plugin?
direct_location, name = location_from_name(name)
if direct_location:
logging.debug(f"install of {name} requested from {direct_location}")
src = InstInfo(name, direct_location, None)
if not src.get_inst_details():
src = None
# Treating a local git repo as a directory allows testing
# uncommitted changes.
if src and src.srctype == Source.LOCAL_REPO:
src.srctype = Source.DIRECTORY
if not direct_location or not src:
log.debug(f"direct_location {direct_location}, src: {src}")
log.debug(f"Searching for {name}")
if search(name):
global LAST_FOUND
src = LAST_FOUND
src.commit = commit
log.debug(f'Retrieving {src.name} from {src.source_loc}')
else:
return None

# Match case of the containing directory
for dirname in os.listdir(RECKLESS_CONFIG.reckless_dir):
if dirname.lower() == installed.name.lower():
inst_path = Path(RECKLESS_CONFIG.reckless_dir)
inst_path = inst_path / dirname / installed.entry
RECKLESS_CONFIG.enable_plugin(inst_path)
enable(installed.name)
return f"{installed.source_loc}"
log.error(('dynamic activation failed: '
f'{installed.name} not found in reckless directory'))
try:
installed = _install_plugin(src)
except FileExistsError as err:
log.error(f'File exists: {err.filename}')
return None
LAST_FOUND = None
if not installed:
log.warning(f'{plugin_name}: installation aborted')
return None

# Match case of the containing directory
for dirname in os.listdir(RECKLESS_CONFIG.reckless_dir):
if dirname.lower() == installed.name.lower():
inst_path = Path(RECKLESS_CONFIG.reckless_dir)
inst_path = inst_path / dirname / installed.entry
RECKLESS_CONFIG.enable_plugin(inst_path)
enable(installed.name)
return f"{installed.source_loc}"
log.error(('dynamic activation failed: '
f'{installed.name} not found in reckless directory'))
return None


Expand Down Expand Up @@ -1385,6 +1431,8 @@ def search(plugin_name: str) -> Union[InstInfo, None]:
if srctype in [Source.DIRECTORY, Source.LOCAL_REPO,
Source.GITHUB_REPO, Source.OTHER_URL]:
found = _source_search(plugin_name, source)
if found:
log.debug(f"{found}, {found.srctype}")
if not found:
continue
log.info(f"found {found.name} in source: {found.source_loc}")
Expand Down
Loading