1
0
mirror of https://github.com/ohmyzsh/ohmyzsh.git synced 2026-02-12 05:49:47 +08:00

Compare commits

...

10 Commits

Author SHA1 Message Date
ohmyzsh[bot]
b1c5315a5f
feat(wd): update to version v0.6.1 (#12413)
Co-authored-by: ohmyzsh[bot] <54982679+ohmyzsh[bot]@users.noreply.github.com>
2024-05-12 12:42:59 +02:00
Carlo Sala
0493eab8ce
fix(dependencies): check if repo is clean before committing 2024-05-12 12:40:45 +02:00
Carlo Sala
1d31ff6037
ci(dependencies): fetch all branches 2024-05-12 12:30:22 +02:00
Carlo Sala
eff648aab0 ci(dependencies): use setup-python and enable cron-based jobs 2024-05-12 12:26:30 +02:00
Carlo Sala
eb2ff84a2c fix(dependencies): avoid creating PR if it's already there 2024-05-12 12:26:30 +02:00
Carlo Sala
423b9a8ded feat(dependencies): add support for semver tags 2024-05-12 12:26:30 +02:00
Carlo Sala
a258eb4547 fix(dependencies): improve typing 2024-05-12 12:26:30 +02:00
Carlo Sala
13c8a10e39 style(dependencies): run ruff formatter 2024-05-12 12:26:30 +02:00
Carlo Sala
83110e8ce1 chore(dependencies): update requirements.txt 2024-05-12 12:26:30 +02:00
Marc Cornellà
d91f4e83ef
fix(fzf): fix missing is-at-least error in setup (#12412)
Fixes #12412
2024-05-12 09:45:26 +02:00
9 changed files with 553 additions and 346 deletions

View File

@ -6,3 +6,6 @@ insert_final_newline = true
charset = utf-8
indent_size = 2
indent_style = space
[*.py]
indent_size = 4

View File

@ -39,7 +39,7 @@ dependencies:
plugins/wd:
repo: mfaerevaag/wd
branch: master
version: tag:v0.6.0
version: tag:v0.6.1
precopy: |
set -e
rm -r test

View File

@ -1,8 +1,8 @@
name: Update dependencies
on:
workflow_dispatch: {}
# schedule:
# - cron: '34 3 * * */8'
schedule:
- cron: "34 3 * * */8"
jobs:
check:
@ -12,12 +12,19 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Authenticate as @ohmyzsh
id: generate_token
uses: ohmyzsh/github-app-token@v2
with:
app_id: ${{ secrets.OHMYZSH_APP_ID }}
private_key: ${{ secrets.OHMYZSH_APP_PRIVATE_KEY }}
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: "pip"
- name: Process dependencies
env:
GH_TOKEN: ${{ steps.generate_token.outputs.token }}

View File

@ -1,2 +1,7 @@
PyYAML~=6.0.1
requests~=2.31.0
certifi==2024.2.2
charset-normalizer==3.3.2
idna==3.7
PyYAML==6.0.1
requests==2.31.0
semver==3.0.2
urllib3==2.2.1

View File

@ -1,11 +1,16 @@
import json
import os
import re
import shutil
import subprocess
import sys
import requests
import shutil
import yaml
import timeit
from copy import deepcopy
from typing import Optional, TypedDict
from typing import Literal, NotRequired, Optional, TypedDict
import requests
import yaml
from semver import Version
# Get TMP_DIR variable from environment
TMP_DIR = os.path.join(os.environ.get("TMP_DIR", "/tmp"), "ohmyzsh")
@ -14,28 +19,58 @@ DEPS_YAML_FILE = ".github/dependencies.yml"
# Dry run flag
DRY_RUN = os.environ.get("DRY_RUN", "0") == "1"
import timeit
# utils for tag comparison
BASEVERSION = re.compile(
r"""[vV]?
(?P<major>(0|[1-9])\d*)
(\.
(?P<minor>(0|[1-9])\d*)
(\.
(?P<patch>(0|[1-9])\d*)
)?
)?
""",
re.VERBOSE,
)
def coerce(version: str) -> Optional[Version]:
match = BASEVERSION.search(version)
if not match:
return None
# BASEVERSION looks for `MAJOR.minor.patch` in the string given
# it fills with None if any of them is missing (for example `2.1`)
ver = {
key: 0 if value is None else value for key, value in match.groupdict().items()
}
# Version takes `major`, `minor`, `patch` arguments
ver = Version(**ver) # pyright: ignore[reportArgumentType]
return ver
class CodeTimer:
def __init__(self, name=None):
self.name = " '" + name + "'" if name else ''
def __init__(self, name=None):
self.name = " '" + name + "'" if name else ""
def __enter__(self):
self.start = timeit.default_timer()
def __enter__(self):
self.start = timeit.default_timer()
def __exit__(self, exc_type, exc_value, traceback):
self.took = (timeit.default_timer() - self.start) * 1000.0
print('Code block' + self.name + ' took: ' + str(self.took) + ' ms')
def __exit__(self, exc_type, exc_value, traceback):
self.took = (timeit.default_timer() - self.start) * 1000.0
print("Code block" + self.name + " took: " + str(self.took) + " ms")
### YAML representation
def str_presenter(dumper, data):
"""
Configures yaml for dumping multiline strings
Ref: https://stackoverflow.com/a/33300001
"""
if len(data.splitlines()) > 1: # check for multiline string
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
"""
Configures yaml for dumping multiline strings
Ref: https://stackoverflow.com/a/33300001
"""
if len(data.splitlines()) > 1: # check for multiline string
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
return dumper.represent_scalar("tag:yaml.org,2002:str", data)
yaml.add_representer(str, str_presenter)
yaml.representer.SafeRepresenter.add_representer(str, str_presenter)
@ -43,408 +78,511 @@ yaml.representer.SafeRepresenter.add_representer(str, str_presenter)
# Types
class DependencyDict(TypedDict):
repo: str
branch: str
version: str
precopy: Optional[str]
postcopy: Optional[str]
repo: str
branch: str
version: str
precopy: NotRequired[str]
postcopy: NotRequired[str]
class DependencyYAML(TypedDict):
dependencies: dict[str, DependencyDict]
dependencies: dict[str, DependencyDict]
class UpdateStatus(TypedDict):
has_updates: bool
version: Optional[str]
compare_url: Optional[str]
head_ref: Optional[str]
head_url: Optional[str]
class UpdateStatusFalse(TypedDict):
has_updates: Literal[False]
class UpdateStatusTrue(TypedDict):
has_updates: Literal[True]
version: str
compare_url: str
head_ref: str
head_url: str
class CommandRunner:
class Exception(Exception):
def __init__(self, message, returncode, stage, stdout, stderr):
super().__init__(message)
self.returncode = returncode
self.stage = stage
self.stdout = stdout
self.stderr = stderr
class Exception(Exception):
def __init__(self, message, returncode, stage, stdout, stderr):
super().__init__(message)
self.returncode = returncode
self.stage = stage
self.stdout = stdout
self.stderr = stderr
@staticmethod
def run_or_fail(command: list[str], stage: str, *args, **kwargs):
if DRY_RUN and command[0] == "gh":
command.insert(0, "echo")
@staticmethod
def run_or_fail(command: list[str], stage: str, *args, **kwargs):
if DRY_RUN and command[0] == "gh":
command.insert(0, "echo")
result = subprocess.run(command, *args, capture_output=True, **kwargs)
result = subprocess.run(command, *args, capture_output=True, **kwargs)
if result.returncode != 0:
raise CommandRunner.Exception(
f"{stage} command failed with exit code {result.returncode}", returncode=result.returncode,
stage=stage,
stdout=result.stdout.decode("utf-8"),
stderr=result.stderr.decode("utf-8")
)
if result.returncode != 0:
raise CommandRunner.Exception(
f"{stage} command failed with exit code {result.returncode}",
returncode=result.returncode,
stage=stage,
stdout=result.stdout.decode("utf-8"),
stderr=result.stderr.decode("utf-8"),
)
return result
return result
class DependencyStore:
store: DependencyYAML = {
"dependencies": {}
}
store: DependencyYAML = {"dependencies": {}}
@staticmethod
def set(data: DependencyYAML):
DependencyStore.store = data
@staticmethod
def set(data: DependencyYAML):
DependencyStore.store = data
@staticmethod
def update_dependency_version(path: str, version: str) -> DependencyYAML:
with CodeTimer(f"store deepcopy: {path}"):
store_copy = deepcopy(DependencyStore.store)
@staticmethod
def update_dependency_version(path: str, version: str) -> DependencyYAML:
with CodeTimer(f"store deepcopy: {path}"):
store_copy = deepcopy(DependencyStore.store)
dependency = store_copy["dependencies"].get(path, {})
dependency["version"] = version
store_copy["dependencies"][path] = dependency
dependency = store_copy["dependencies"].get(path)
if dependency is None:
raise ValueError(f"Dependency {path} {version} not found")
dependency["version"] = version
store_copy["dependencies"][path] = dependency
return store_copy
return store_copy
@staticmethod
def write_store(file: str, data: DependencyYAML):
with open(file, "w") as yaml_file:
yaml.safe_dump(data, yaml_file, sort_keys=False)
@staticmethod
def write_store(file: str, data: DependencyYAML):
with open(file, "w") as yaml_file:
yaml.safe_dump(data, yaml_file, sort_keys=False)
class Dependency:
def __init__(self, path: str, values: DependencyDict):
self.path = path
self.values = values
def __init__(self, path: str, values: DependencyDict):
self.path = path
self.values = values
self.name: str = ""
self.desc: str = ""
self.kind: str = ""
self.name: str = ""
self.desc: str = ""
self.kind: str = ""
match path.split("/"):
case ["plugins", name]:
self.name = name
self.kind = "plugin"
self.desc = f"{name} plugin"
case ["themes", name]:
self.name = name.replace(".zsh-theme", "")
self.kind = "theme"
self.desc = f"{self.name} theme"
case _:
self.name = self.desc = path
match path.split("/"):
case ["plugins", name]:
self.name = name
self.kind = "plugin"
self.desc = f"{name} plugin"
case ["themes", name]:
self.name = name.replace(".zsh-theme", "")
self.kind = "theme"
self.desc = f"{self.name} theme"
case _:
self.name = self.desc = path
def __str__(self):
output: str = ""
for key in DependencyDict.__dict__['__annotations__'].keys():
if key not in self.values:
output += f"{key}: None\n"
continue
def __str__(self):
output: str = ""
for key in DependencyDict.__dict__["__annotations__"].keys():
if key not in self.values:
output += f"{key}: None\n"
continue
value = self.values[key]
if "\n" not in value:
output += f"{key}: {value}\n"
else:
output += f"{key}:\n "
output += value.replace("\n", "\n ", value.count("\n") - 1)
return output
value = self.values[key]
if "\n" not in value:
output += f"{key}: {value}\n"
else:
output += f"{key}:\n "
output += value.replace("\n", "\n ", value.count("\n") - 1)
return output
def update_or_notify(self):
# Print dependency settings
print(f"Processing {self.desc}...", file=sys.stderr)
print(self, file=sys.stderr)
def update_or_notify(self):
# Print dependency settings
print(f"Processing {self.desc}...", file=sys.stderr)
print(self, file=sys.stderr)
# Check for updates
repo = self.values["repo"]
remote_branch = self.values["branch"]
version = self.values["version"]
is_tag = version.startswith("tag:")
try:
with CodeTimer(f"update check: {repo}"):
if is_tag:
status = GitHub.check_newer_tag(repo, version.replace("tag:", ""))
else:
status = GitHub.check_updates(repo, remote_branch, version)
if status["has_updates"]:
short_sha = status["head_ref"][:8]
new_version = status["version"] if is_tag else short_sha
# Check for updates
repo = self.values["repo"]
remote_branch = self.values["branch"]
version = self.values["version"]
is_tag = version.startswith("tag:")
try:
# Create new branch
branch = Git.create_branch(self.path, new_version)
with CodeTimer(f"update check: {repo}"):
if is_tag:
status = GitHub.check_newer_tag(repo, version.replace("tag:", ""))
else:
status = GitHub.check_updates(repo, remote_branch, version)
# Update dependencies.yml file
self.__update_yaml(f"tag:{new_version}" if is_tag else status["version"])
if status["has_updates"] is True:
short_sha = status["head_ref"][:8]
new_version = status["version"] if is_tag else short_sha
# Update dependency files
self.__apply_upstream_changes()
try:
branch_name = f"update/{self.path}/{new_version}"
# Add all changes and commit
Git.add_and_commit(self.name, short_sha)
# Create new branch
branch = Git.checkout_or_create_branch(branch_name)
# Push changes to remote
Git.push(branch)
# Update dependencies.yml file
self.__update_yaml(
f"tag:{new_version}" if is_tag else status["version"]
)
# Create GitHub PR
GitHub.create_pr(
branch,
f"feat({self.name}): update to version {new_version}",
f"""## Description
# Update dependency files
self.__apply_upstream_changes()
# Add all changes and commit
Git.add_and_commit(self.name, short_sha)
# Push changes to remote
Git.push(branch)
# Create GitHub PR
GitHub.create_pr(
branch,
f"feat({self.name}): update to version {new_version}",
f"""## Description
Update for **{self.desc}**: update to version [{new_version}]({status['head_url']}).
Check out the [list of changes]({status['compare_url']}).
"""
)
""",
)
# Clean up repository
Git.clean_repo()
except (CommandRunner.Exception, shutil.Error) as e:
# Handle exception on automatic update
match type(e):
case CommandRunner.Exception:
# Print error message
print(f"Error running {e.stage} command: {e.returncode}", file=sys.stderr)
print(e.stderr, file=sys.stderr)
case shutil.Error:
print(f"Error copying files: {e}", file=sys.stderr)
# Clean up repository
Git.clean_repo()
except (CommandRunner.Exception, shutil.Error) as e:
# Handle exception on automatic update
match type(e):
case CommandRunner.Exception:
# Print error message
print(
f"Error running {e.stage} command: {e.returncode}", # pyright: ignore[reportAttributeAccessIssue]
file=sys.stderr,
)
print(e.stderr, file=sys.stderr) # pyright: ignore[reportAttributeAccessIssue]
case shutil.Error:
print(f"Error copying files: {e}", file=sys.stderr)
try:
Git.clean_repo()
except CommandRunner.Exception as e:
print(f"Error reverting repository to clean state: {e}", file=sys.stderr)
sys.exit(1)
try:
Git.clean_repo()
except CommandRunner.Exception as e:
print(
f"Error reverting repository to clean state: {e}",
file=sys.stderr,
)
sys.exit(1)
# Create a GitHub issue to notify maintainer
title = f"{self.path}: update to {new_version}"
body = (
f"""## Description
# Create a GitHub issue to notify maintainer
title = f"{self.path}: update to {new_version}"
body = f"""## Description
There is a new version of `{self.name}` {self.kind} available.
New version: [{new_version}]({status['head_url']})
Check out the [list of changes]({status['compare_url']}).
"""
)
print(f"Creating GitHub issue", file=sys.stderr)
print(f"{title}\n\n{body}", file=sys.stderr)
GitHub.create_issue(title, body)
except Exception as e:
print(e, file=sys.stderr)
print("Creating GitHub issue", file=sys.stderr)
print(f"{title}\n\n{body}", file=sys.stderr)
GitHub.create_issue(title, body)
except Exception as e:
print(e, file=sys.stderr)
def __update_yaml(self, new_version: str) -> None:
dep_yaml = DependencyStore.update_dependency_version(self.path, new_version)
DependencyStore.write_store(DEPS_YAML_FILE, dep_yaml)
def __update_yaml(self, new_version: str) -> None:
dep_yaml = DependencyStore.update_dependency_version(self.path, new_version)
DependencyStore.write_store(DEPS_YAML_FILE, dep_yaml)
def __apply_upstream_changes(self) -> None:
# Patterns to ignore in copying files from upstream repo
GLOBAL_IGNORE = [
".git",
".github",
".gitignore"
]
def __apply_upstream_changes(self) -> None:
# Patterns to ignore in copying files from upstream repo
GLOBAL_IGNORE = [".git", ".github", ".gitignore"]
path = os.path.abspath(self.path)
precopy = self.values.get("precopy")
postcopy = self.values.get("postcopy")
path = os.path.abspath(self.path)
precopy = self.values.get("precopy")
postcopy = self.values.get("postcopy")
repo = self.values["repo"]
branch = self.values["branch"]
remote_url = f"https://github.com/{repo}.git"
repo_dir = os.path.join(TMP_DIR, repo)
repo = self.values["repo"]
branch = self.values["branch"]
remote_url = f"https://github.com/{repo}.git"
repo_dir = os.path.join(TMP_DIR, repo)
# Clone repository
Git.clone(remote_url, branch, repo_dir, reclone=True)
# Clone repository
Git.clone(remote_url, branch, repo_dir, reclone=True)
# Run precopy on tmp repo
if precopy is not None:
print("Running precopy script:", end="\n ", file=sys.stderr)
print(precopy.replace("\n", "\n ", precopy.count("\n") - 1), file=sys.stderr)
CommandRunner.run_or_fail(["bash", "-c", precopy], cwd=repo_dir, stage="Precopy")
# Run precopy on tmp repo
if precopy is not None:
print("Running precopy script:", end="\n ", file=sys.stderr)
print(
precopy.replace("\n", "\n ", precopy.count("\n") - 1), file=sys.stderr
)
CommandRunner.run_or_fail(
["bash", "-c", precopy], cwd=repo_dir, stage="Precopy"
)
# Copy files from upstream repo
print(f"Copying files from {repo_dir} to {path}", file=sys.stderr)
shutil.copytree(repo_dir, path, dirs_exist_ok=True, ignore=shutil.ignore_patterns(*GLOBAL_IGNORE))
# Copy files from upstream repo
print(f"Copying files from {repo_dir} to {path}", file=sys.stderr)
shutil.copytree(
repo_dir,
path,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns(*GLOBAL_IGNORE),
)
# Run postcopy on our repository
if postcopy is not None:
print("Running postcopy script:", end="\n ", file=sys.stderr)
print(postcopy.replace("\n", "\n ", postcopy.count("\n") - 1), file=sys.stderr)
CommandRunner.run_or_fail(["bash", "-c", postcopy], cwd=path, stage="Postcopy")
# Run postcopy on our repository
if postcopy is not None:
print("Running postcopy script:", end="\n ", file=sys.stderr)
print(
postcopy.replace("\n", "\n ", postcopy.count("\n") - 1),
file=sys.stderr,
)
CommandRunner.run_or_fail(
["bash", "-c", postcopy], cwd=path, stage="Postcopy"
)
class Git:
default_branch = "master"
default_branch = "master"
@staticmethod
def clone(remote_url: str, branch: str, repo_dir: str, reclone=False):
# If repo needs to be fresh
if reclone and os.path.exists(repo_dir):
shutil.rmtree(repo_dir)
@staticmethod
def clone(remote_url: str, branch: str, repo_dir: str, reclone=False):
# If repo needs to be fresh
if reclone and os.path.exists(repo_dir):
shutil.rmtree(repo_dir)
# Clone repo in tmp directory and checkout branch
if not os.path.exists(repo_dir):
print(f"Cloning {remote_url} to {repo_dir} and checking out {branch}", file=sys.stderr)
CommandRunner.run_or_fail(["git", "clone", "--depth=1", "-b", branch, remote_url, repo_dir], stage="Clone")
# Clone repo in tmp directory and checkout branch
if not os.path.exists(repo_dir):
print(
f"Cloning {remote_url} to {repo_dir} and checking out {branch}",
file=sys.stderr,
)
CommandRunner.run_or_fail(
["git", "clone", "--depth=1", "-b", branch, remote_url, repo_dir],
stage="Clone",
)
@staticmethod
def create_branch(path: str, version: str):
# Get current branch name
result = CommandRunner.run_or_fail(["git", "rev-parse", "--abbrev-ref", "HEAD"], stage="GetDefaultBranch")
Git.default_branch = result.stdout.decode("utf-8").strip()
@staticmethod
def checkout_or_create_branch(branch_name: str):
# Get current branch name
result = CommandRunner.run_or_fail(
["git", "rev-parse", "--abbrev-ref", "HEAD"], stage="GetDefaultBranch"
)
Git.default_branch = result.stdout.decode("utf-8").strip()
# Create new branch and return created branch name
branch_name = f"update/{path}/{version}"
CommandRunner.run_or_fail(["git", "checkout", "-b", branch_name], stage="CreateBranch")
return branch_name
# Create new branch and return created branch name
try:
# try to checkout already existing branch
CommandRunner.run_or_fail(
["git", "checkout", branch_name], stage="CreateBranch"
)
except CommandRunner.Exception:
# otherwise create new branch
CommandRunner.run_or_fail(
["git", "checkout", "-b", branch_name], stage="CreateBranch"
)
return branch_name
@staticmethod
def add_and_commit(scope: str, version: str):
user_name = os.environ.get("GIT_APP_NAME")
user_email = os.environ.get("GIT_APP_EMAIL")
@staticmethod
def add_and_commit(scope: str, version: str):
user_name = os.environ.get("GIT_APP_NAME")
user_email = os.environ.get("GIT_APP_EMAIL")
# Add all files to git staging
CommandRunner.run_or_fail(["git", "add", "-A", "-v"], stage="AddFiles")
# Add all files to git staging
CommandRunner.run_or_fail(["git", "add", "-A", "-v"], stage="AddFiles")
# Reset environment and git config
clean_env = os.environ.copy()
clean_env["LANG"]="C.UTF-8"
clean_env["GIT_CONFIG_GLOBAL"]="/dev/null"
clean_env["GIT_CONFIG_NOSYSTEM"]="1"
# Reset environment and git config
clean_env = os.environ.copy()
clean_env["LANG"] = "C.UTF-8"
clean_env["GIT_CONFIG_GLOBAL"] = "/dev/null"
clean_env["GIT_CONFIG_NOSYSTEM"] = "1"
# Commit with settings above
CommandRunner.run_or_fail([
"git",
"-c", f"user.name={user_name}",
"-c", f"user.email={user_email}",
"commit",
"-m", f"feat({scope}): update to {version}"
], stage="CreateCommit", env=clean_env)
# check if repo is clean (clean => no error, no commit)
try:
CommandRunner.run_or_fail(
["git", "diff", "--exit-code"], stage="CheckRepoClean", env=clean_env
)
except CommandRunner.Exception:
# Commit with settings above
CommandRunner.run_or_fail(
[
"git",
"-c",
f"user.name={user_name}",
"-c",
f"user.email={user_email}",
"commit",
"-m",
f"feat({scope}): update to {version}",
],
stage="CreateCommit",
env=clean_env,
)
@staticmethod
def push(branch: str):
CommandRunner.run_or_fail(["git", "push", "-u", "origin", branch], stage="PushBranch")
@staticmethod
def push(branch: str):
CommandRunner.run_or_fail(
["git", "push", "-u", "origin", branch], stage="PushBranch"
)
@staticmethod
def clean_repo():
CommandRunner.run_or_fail(["git", "reset", "--hard", "HEAD"], stage="ResetRepository")
CommandRunner.run_or_fail(["git", "checkout", Git.default_branch], stage="CheckoutDefaultBranch")
@staticmethod
def clean_repo():
CommandRunner.run_or_fail(
["git", "reset", "--hard", "HEAD"], stage="ResetRepository"
)
CommandRunner.run_or_fail(
["git", "checkout", Git.default_branch], stage="CheckoutDefaultBranch"
)
class GitHub:
@staticmethod
def check_newer_tag(repo, current_tag) -> UpdateStatus:
# GET /repos/:owner/:repo/git/refs/tags
url = f"https://api.github.com/repos/{repo}/git/refs/tags"
@staticmethod
def check_newer_tag(repo, current_tag) -> UpdateStatusFalse | UpdateStatusTrue:
# GET /repos/:owner/:repo/git/refs/tags
url = f"https://api.github.com/repos/{repo}/git/refs/tags"
# Send a GET request to the GitHub API
response = requests.get(url)
# Send a GET request to the GitHub API
response = requests.get(url)
current_version = coerce(current_tag)
if current_version is None:
raise ValueError(
f"Stored {current_version} from {repo} does not follow semver"
)
# If the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()
# If the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()
if len(data) == 0:
return {
"has_updates": False,
}
if len(data) == 0:
return {
"has_updates": False,
}
latest_ref = data[-1]
latest_tag = latest_ref["ref"].replace("refs/tags/", "")
latest_ref = None
latest_version: Optional[Version] = None
for ref in data:
# we find the tag since GitHub returns it as plain git ref
tag_version = coerce(ref["ref"].replace("refs/tags/", ""))
if tag_version is None:
# we skip every tag that is not semver-complaint
continue
if latest_version is None or tag_version.compare(latest_version) > 0:
# if we have a "greater" semver version, set it as latest
latest_version = tag_version
latest_ref = ref
if latest_tag == current_tag:
return {
"has_updates": False,
}
# raise if no valid semver tag is found
if latest_ref is None or latest_version is None:
raise ValueError(f"No tags following semver found in {repo}")
return {
"has_updates": True,
"version": latest_tag,
"compare_url": f"https://github.com/{repo}/compare/{current_tag}...{latest_tag}",
"head_ref": latest_ref["object"]["sha"],
"head_url": f"https://github.com/{repo}/releases/tag/{latest_tag}",
}
else:
# If the request was not successful, raise an exception
raise Exception(f"GitHub API request failed with status code {response.status_code}: {response.json()}")
# we get the tag since GitHub returns it as plain git ref
latest_tag = latest_ref["ref"].replace("refs/tags/", "")
@staticmethod
def check_updates(repo, branch, version) -> UpdateStatus:
# TODO: add support for semver updating (based on tags)
# Check if upstream github repo has a new version
# GitHub API URL for comparing two commits
url = f"https://api.github.com/repos/{repo}/compare/{version}...{branch}"
if latest_version.compare(current_version) <= 0:
return {
"has_updates": False,
}
# Send a GET request to the GitHub API
response = requests.get(url)
return {
"has_updates": True,
"version": latest_tag,
"compare_url": f"https://github.com/{repo}/compare/{current_tag}...{latest_tag}",
"head_ref": latest_ref["object"]["sha"],
"head_url": f"https://github.com/{repo}/releases/tag/{latest_tag}",
}
else:
# If the request was not successful, raise an exception
raise Exception(
f"GitHub API request failed with status code {response.status_code}: {response.json()}"
)
# If the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()
@staticmethod
def check_updates(repo, branch, version) -> UpdateStatusFalse | UpdateStatusTrue:
url = f"https://api.github.com/repos/{repo}/compare/{version}...{branch}"
# If the base is behind the head, there is a newer version
has_updates = data["status"] != "identical"
# Send a GET request to the GitHub API
response = requests.get(url)
if not has_updates:
return {
"has_updates": False,
}
# If the request was successful
if response.status_code == 200:
# Parse the JSON response
data = response.json()
return {
"has_updates": data["status"] != "identical",
"version": data["commits"][-1]["sha"],
"compare_url": data["permalink_url"],
"head_ref": data["commits"][-1]["sha"],
"head_url": data["commits"][-1]["html_url"]
}
else:
# If the request was not successful, raise an exception
raise Exception(f"GitHub API request failed with status code {response.status_code}: {response.json()}")
# If the base is behind the head, there is a newer version
has_updates = data["status"] != "identical"
@staticmethod
def create_issue(title: str, body: str) -> None:
cmd = [
"gh",
"issue",
"create",
"-t", title,
"-b", body
]
CommandRunner.run_or_fail(cmd, stage="CreateIssue")
if not has_updates:
return {
"has_updates": False,
}
@staticmethod
def create_pr(branch: str, title: str, body: str) -> None:
cmd = [
"gh",
"pr",
"create",
"-B", Git.default_branch,
"-H", branch,
"-t", title,
"-b", body
]
CommandRunner.run_or_fail(cmd, stage="CreatePullRequest")
return {
"has_updates": data["status"] != "identical",
"version": data["commits"][-1]["sha"],
"compare_url": data["permalink_url"],
"head_ref": data["commits"][-1]["sha"],
"head_url": data["commits"][-1]["html_url"],
}
else:
# If the request was not successful, raise an exception
raise Exception(
f"GitHub API request failed with status code {response.status_code}: {response.json()}"
)
@staticmethod
def create_issue(title: str, body: str) -> None:
cmd = ["gh", "issue", "create", "-t", title, "-b", body]
CommandRunner.run_or_fail(cmd, stage="CreateIssue")
@staticmethod
def create_pr(branch: str, title: str, body: str) -> None:
# first of all let's check if PR is already open
check_cmd = [
"gh",
"pr",
"list",
"--state",
"open",
"--head",
branch,
"--json",
"title",
]
# returncode is 0 also if no PRs are found
output = json.loads(
CommandRunner.run_or_fail(check_cmd, stage="CheckPullRequestOpen")
.stdout.decode("utf-8")
.strip()
)
# we have PR in this case!
if len(output) > 0:
return
cmd = [
"gh",
"pr",
"create",
"-B",
Git.default_branch,
"-H",
branch,
"-t",
title,
"-b",
body,
]
CommandRunner.run_or_fail(cmd, stage="CreatePullRequest")
def main():
# Load the YAML file
with open(DEPS_YAML_FILE, "r") as yaml_file:
data: DependencyYAML = yaml.safe_load(yaml_file)
# Load the YAML file
with open(DEPS_YAML_FILE, "r") as yaml_file:
data: DependencyYAML = yaml.safe_load(yaml_file)
if "dependencies" not in data:
raise Exception(f"dependencies.yml not properly formatted")
if "dependencies" not in data:
raise Exception("dependencies.yml not properly formatted")
# Cache YAML version
DependencyStore.set(data)
# Cache YAML version
DependencyStore.set(data)
dependencies = data["dependencies"]
for path in dependencies:
dependency = Dependency(path, dependencies[path])
dependency.update_or_notify()
dependencies = data["dependencies"]
for path in dependencies:
dependency = Dependency(path, dependencies[path])
dependency.update_or_notify()
if __name__ == "__main__":
main()
main()

View File

@ -4,6 +4,8 @@ function fzf_setup_using_fzf() {
# we remove "fzf " prefix, this fixes really old fzf versions behaviour
# see https://github.com/ohmyzsh/ohmyzsh/issues/12387
local fzf_ver=${"$(fzf --version)"#fzf }
autoload -Uz is-at-least
is-at-least 0.48.0 ${${(s: :)fzf_ver}[1]} || return 1
eval "$(fzf --zsh)"

View File

@ -57,6 +57,24 @@ wd() {
}
```
### [Home Manager](https://github.com/nix-community/home-manager)
Add the following to your `home.nix` then run `home-manager switch`:
```nix
programs.zsh.plugins = [
{
name = "wd";
src = pkgs.fetchFromGitHub {
owner = "mfaerevaag";
repo = "wd";
rev = "v0.5.2";
sha256 = "sha256-4yJ1qhqhNULbQmt6Z9G22gURfDLe30uV1ascbzqgdhg=";
};
}
];
```
### [zplug](https://github.com/zplug/zplug)
```zsh
@ -119,6 +137,14 @@ Also, you may have to force a rebuild of `zcompdump` by running:
rm -f ~/.zcompdump; compinit
```
## Browse
If you want to make use of the `fzf`-powered browse feature to fuzzy search through all your warp points, set up a keybind in your `.zshrc`:
```zsh
bindkey '^G' wd_browse
```
## Usage
* Add warp point to current working directory:
@ -153,7 +179,7 @@ wd ..
wd ...
```
This is a wrapper for the zsh's `dirs` function.
This is a wrapper for the zsh's `dirs` function.
_You might need to add `setopt AUTO_PUSHD` to your `.zshrc` if you are not using [oh-my-zsh](https://github.com/ohmyzsh/ohmyzsh)._
* Remove warp point:

View File

@ -8,8 +8,13 @@
# @github.com/mfaerevaag/wd
# Handle $0 according to the standard:
# https://zdharma-continuum.github.io/Zsh-100-Commits-Club/Zsh-Plugin-Standard.html
# # https://zdharma-continuum.github.io/Zsh-100-Commits-Club/Zsh-Plugin-Standard.html
0="${${ZERO:-${0:#$ZSH_ARGZERO}}:-${(%):-%N}}"
0="${${(M)0:#/*}:-$PWD/$0}"
eval "wd() { source '${0:A:h}/wd.sh' }"
wd > /dev/null
# Register the function as a Zsh widget
zle -N wd_browse
# Bind the widget to a key combination
bindkey '^G' wd_browse

37
plugins/wd/wd.sh Normal file → Executable file
View File

@ -8,7 +8,7 @@
# @github.com/mfaerevaag/wd
# version
readonly WD_VERSION=0.5.0
readonly WD_VERSION=0.6.1
# colors
readonly WD_BLUE="\033[96m"
@ -57,12 +57,11 @@ wd_print_msg()
{
if [[ -z $wd_quiet_mode ]]
then
local color=$1
local msg=$2
local color="${1:-$WD_BLUE}" # Default to blue if no color is provided
local msg="$2"
if [[ $color == "" || $msg == "" ]]
then
print " ${WD_RED}*${WD_NOC} Could not print message. Sorry!"
if [[ -z "$msg" ]]; then
print "${WD_RED}*${WD_NOC} Could not print message. Sorry!"
else
print " ${color}*${WD_NOC} ${msg}"
fi
@ -230,6 +229,20 @@ wd_remove()
done
}
wd_browse() {
if ! command -v fzf >/dev/null; then
echo "This functionality requires fzf. Please install fzf first."
return 1
fi
local entries=("${(@f)$(sed "s:${HOME}:~:g" "$WD_CONFIG" | awk -F ':' '{print $1 " -> " $2}')}")
local selected_entry=$(printf '%s\n' "${entries[@]}" | fzf --height 40% --reverse)
if [[ -n $selected_entry ]]; then
local selected_point="${selected_entry%% ->*}"
selected_point=$(echo "$selected_point" | xargs)
wd $selected_point
fi
}
wd_list_all()
{
wd_print_msg "$WD_BLUE" "All warp points:"
@ -396,7 +409,9 @@ fi
# disable extendedglob for the complete wd execution time
setopt | grep -q extendedglob
wd_extglob_is_set=$?
(( ! $wd_extglob_is_set )) && setopt noextendedglob
if (( wd_extglob_is_set == 0 )); then
setopt noextendedglob
fi
# load warp points
typeset -A points
@ -436,6 +451,10 @@ else
wd_add "$2" "$wd_force_mode"
break
;;
"-b"|"browse")
wd_browse
break
;;
"-e"|"export")
wd_export_static_named_directories
break
@ -484,7 +503,9 @@ fi
# if not, next time warp will pick up variables from this run
# remember, there's no sub shell
(( ! $wd_extglob_is_set )) && setopt extendedglob
if (( wd_extglob_is_set == 0 )); then
setopt extendedglob
fi
unset wd_extglob_is_set
unset wd_warp