diff options
Diffstat (limited to 'nixos/tests/google-oslogin/server.py')
-rw-r--r-- | nixos/tests/google-oslogin/server.py | 83 |
1 files changed, 61 insertions, 22 deletions
diff --git a/nixos/tests/google-oslogin/server.py b/nixos/tests/google-oslogin/server.py index bfc527cb97d3..5ea9bbd2c96b 100644 --- a/nixos/tests/google-oslogin/server.py +++ b/nixos/tests/google-oslogin/server.py @@ -7,24 +7,29 @@ import hashlib import base64 from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib.parse import urlparse, parse_qs from typing import Dict SNAKEOIL_PUBLIC_KEY = os.environ['SNAKEOIL_PUBLIC_KEY'] +MOCKUSER="mockuser_nixos_org" +MOCKADMIN="mockadmin_nixos_org" -def w(msg): +def w(msg: bytes): sys.stderr.write(f"{msg}\n") sys.stderr.flush() -def gen_fingerprint(pubkey): +def gen_fingerprint(pubkey: str): decoded_key = base64.b64decode(pubkey.encode("ascii").split()[1]) return hashlib.sha256(decoded_key).hexdigest() -def gen_email(username): + +def gen_email(username: str): """username seems to be a 21 characters long number string, so mimic that in a reproducible way""" return str(int(hashlib.sha256(username.encode()).hexdigest(), 16))[0:21] + def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoil_pubkey: str) -> Dict: snakeoil_pubkey_fingerprint = gen_fingerprint(snakeoil_pubkey) # seems to be a 21 characters long numberstring, so mimic that in a reproducible way @@ -56,7 +61,8 @@ def gen_mockuser(username: str, uid: str, gid: str, home_directory: str, snakeoi class ReqHandler(BaseHTTPRequestHandler): - def _send_json_ok(self, data): + + def _send_json_ok(self, data: dict): self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() @@ -64,29 +70,62 @@ class ReqHandler(BaseHTTPRequestHandler): w(out) self.wfile.write(out) + def _send_json_success(self, success=True): + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + out = json.dumps({"success": success}).encode() + w(out) + self.wfile.write(out) + + def _send_404(self): + self.send_response(404) + self.end_headers() + def do_GET(self): p = str(self.path) - # mockuser and mockadmin are allowed to login, both use the same snakeoil public key - if p == '/computeMetadata/v1/oslogin/users?username=mockuser' \ - or p == '/computeMetadata/v1/oslogin/users?uid=1009719690': - self._send_json_ok(gen_mockuser(username='mockuser', uid='1009719690', gid='1009719690', - home_directory='/home/mockuser', snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY)) - elif p == '/computeMetadata/v1/oslogin/users?username=mockadmin' \ - or p == '/computeMetadata/v1/oslogin/users?uid=1009719691': - self._send_json_ok(gen_mockuser(username='mockadmin', uid='1009719691', gid='1009719691', - home_directory='/home/mockadmin', snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY)) - - # mockuser is allowed to login - elif p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockuser')}&policy=login": - self._send_json_ok({'success': True}) - - # mockadmin may also become root - elif p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockadmin')}&policy=login" or p == f"/computeMetadata/v1/oslogin/authorize?email={gen_email('mockadmin')}&policy=adminLogin": - self._send_json_ok({'success': True}) + pu = urlparse(p) + params = parse_qs(pu.query) + + # users endpoint + if pu.path == "/computeMetadata/v1/oslogin/users": + # mockuser and mockadmin are allowed to login, both use the same snakeoil public key + if params.get('username') == [MOCKUSER] or params.get('uid') == ["1009719690"]: + username = MOCKUSER + uid = "1009719690" + elif params.get('username') == [MOCKADMIN] or params.get('uid') == ["1009719691"]: + username = MOCKADMIN + uid = "1009719691" + else: + self._send_404() + return + + self._send_json_ok(gen_mockuser(username=username, uid=uid, gid=uid, home_directory=f"/home/{username}", snakeoil_pubkey=SNAKEOIL_PUBLIC_KEY)) + return + + # authorize endpoint + elif pu.path == "/computeMetadata/v1/oslogin/authorize": + # is user allowed to login? + if params.get("policy") == ["login"]: + # mockuser and mockadmin are allowed to login + if params.get('email') == [gen_email(MOCKUSER)] or params.get('email') == [gen_email(MOCKADMIN)]: + self._send_json_success() + return + self._send_json_success(False) + return + # is user allowed to become root? + elif params.get("policy") == ["adminLogin"]: + # only mockadmin is allowed to become admin + self._send_json_success((params['email'] == [gen_email(MOCKADMIN)])) + return + # send 404 for other policies + else: + self._send_404() + return else: sys.stderr.write(f"Unhandled path: {p}\n") sys.stderr.flush() - self.send_response(501) + self.send_response(404) self.end_headers() self.wfile.write(b'') |