about summary refs log tree commit diff
path: root/nixpkgs/pkgs/development/tools/parsing/tree-sitter/update_impl.py
blob: a53e1ee62c366769dfddc18be377389404441dd3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from urllib.parse import quote
import json
import subprocess as sub
import os
import sys
from typing import Iterator, Any, Literal, TypedDict, Optional
from tempfile import NamedTemporaryFile

debug: bool = True if os.environ.get("DEBUG", False) else False
Bin = str
args: dict[str, Any] = json.loads(os.environ["ARGS"])
bins: dict[str, Bin] = args["binaries"]

mode: str = sys.argv[1]
jsonArg: dict = json.loads(sys.argv[2])

Args = Iterator[str]


def log(msg: str) -> None:
    print(msg, file=sys.stderr)


def atomically_write(file_path: str, content: bytes) -> None:
    """atomically write the content into `file_path`"""
    with NamedTemporaryFile(
        # write to the parent dir, so that it’s guaranteed to be on the same filesystem
        dir=os.path.dirname(file_path),
        delete=False
    ) as tmp:
        try:
            tmp.write(content)
            os.rename(
                src=tmp.name,
                dst=file_path
            )
        except Exception:
            os.unlink(tmp.name)


def curl_github_args(token: str | None, url: str) -> Args:
    """Query the github API via curl"""
    yield bins["curl"]
    if not debug:
        yield "--silent"
    # follow redirects
    yield "--location"
    if token:
        yield "-H"
        yield f"Authorization: token {token}"
    yield url


def curl_result(output: bytes) -> Any | Literal["not found"]:
    """Parse the curl result of the github API"""
    res: Any = json.loads(output)
    match res:
        case dict(res):
            message: str = res.get("message", "")
            if "rate limit" in message:
                sys.exit("Rate limited by the Github API")
            if "Not Found" in message:
                return "not found"
    # if the result is another type, we can pass it on
    return res


def nix_prefetch_git_args(url: str, version_rev: str) -> Args:
    """Prefetch a git repository"""
    yield bins["nix-prefetch-git"]
    if not debug:
        yield "--quiet"
    yield "--no-deepClone"
    yield "--url"
    yield url
    yield "--rev"
    yield version_rev


def run_cmd(args: Args) -> bytes:
    all = list(args)
    if debug:
        log(str(all))
    return sub.check_output(all)


Dir = str


def fetchRepo() -> None:
    """fetch the given repo and write its nix-prefetch output to the corresponding grammar json file"""
    match jsonArg:
        case {
            "orga": orga,
            "repo": repo,
            "outputDir": outputDir,
            "nixRepoAttrName": nixRepoAttrName,
        }:
            token: str | None = os.environ.get("GITHUB_TOKEN", None)
            out = run_cmd(
                curl_github_args(
                    token,
                    url=f"https://api.github.com/repos/{quote(orga)}/{quote(repo)}/releases/latest"
                )
            )
            release: str
            match curl_result(out):
                case "not found":
                    if "branch" in jsonArg:
                        branch = jsonArg.get("branch")
                        release = f"refs/heads/{branch}"
                    else:
                        # github sometimes returns an empty list even tough there are releases
                        log(f"uh-oh, latest for {orga}/{repo} is not there, using HEAD")
                        release = "HEAD"
                case {"tag_name": tag_name}:
                    release = tag_name
                case _:
                    sys.exit(f"git result for {orga}/{repo} did not have a `tag_name` field")

            log(f"Fetching latest release ({release}) of {orga}/{repo} …")
            res = run_cmd(
                nix_prefetch_git_args(
                    url=f"https://github.com/{quote(orga)}/{quote(repo)}",
                    version_rev=release
                )
            )
            atomically_write(
                file_path=os.path.join(
                    outputDir,
                    f"{nixRepoAttrName}.json"
                ),
                content=res
            )
        case _:
            sys.exit("input json must have `orga` and `repo` keys")


def fetchOrgaLatestRepos(orga: str) -> set[str]:
    """fetch the latest (100) repos from the given github organization"""
    token: str | None = os.environ.get("GITHUB_TOKEN", None)
    out = run_cmd(
        curl_github_args(
            token,
            url=f"https://api.github.com/orgs/{quote(orga)}/repos?per_page=100"
        )
    )
    match curl_result(out):
        case "not found":
            sys.exit(f"github organization {orga} not found")
        case list(repos):
            res: list[str] = []
            for repo in repos:
                name = repo.get("name")
                if name:
                    res.append(name)
            return set(res)
        case _:
            sys.exit("github result was not a list of repos, but {other}")


def checkTreeSitterRepos(latest_github_repos: set[str]) -> None:
    """Make sure we know about all tree sitter repos on the tree sitter orga."""
    known: set[str] = set(args["knownTreeSitterOrgGrammarRepos"])
    ignored: set[str] = set(args["ignoredTreeSitterOrgRepos"])

    unknown = latest_github_repos - (known | ignored)

    if unknown:
        sys.exit(f"These repositories are neither known nor ignored:\n{unknown}")


Grammar = TypedDict(
    "Grammar",
    {
        "nixRepoAttrName": str,
        "orga": str,
        "repo": str,
        "branch": Optional[str]
    }
)


def printAllGrammarsNixFile() -> None:
    """Print a .nix file that imports all grammars."""
    allGrammars: list[dict[str, Grammar]] = jsonArg["allGrammars"]
    outputDir: Dir = jsonArg["outputDir"]

    def file() -> Iterator[str]:
        yield "{ lib }:"
        yield "{"
        for grammar in allGrammars:
            n = grammar["nixRepoAttrName"]
            yield f"  {n} = lib.importJSON ./{n}.json;"
        yield "}"
        yield ""

    atomically_write(
        file_path=os.path.join(
            outputDir,
            "default.nix"
        ),
        content="\n".join(file()).encode()
    )


def fetchAndCheckTreeSitterRepos() -> None:
    log("fetching list of grammars")
    latest_repos = fetchOrgaLatestRepos(orga="tree-sitter")
    log("checking the tree-sitter repo list against the grammars we know")
    checkTreeSitterRepos(latest_repos)


match mode:
    case "fetch-repo":
        fetchRepo()
    case "fetch-and-check-tree-sitter-repos":
        fetchAndCheckTreeSitterRepos()
    case "print-all-grammars-nix-file":
        printAllGrammarsNixFile()
    case _:
        sys.exit(f"mode {mode} unknown")