From 03ea0ff5dcee3410d5aa477541c0ef642da4f7ae Mon Sep 17 00:00:00 2001 From: Alyssa Ross Date: Tue, 4 Feb 2020 21:26:23 +0000 Subject: Initial commit --- src/main.rs | 571 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 571 insertions(+) create mode 100644 src/main.rs (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ab3e918 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,571 @@ +use clap::clap_app; +use failure::{err_msg, format_err, Error}; +use isahc::prelude::*; +use serde::Deserialize; +use std::ffi::CString; +use std::ffi::{OsStr, OsString}; +use std::io::{self, stderr, stdin, BufRead, BufReader, ErrorKind, Read, Write}; +use std::mem::MaybeUninit; +use std::os::unix::prelude::*; +use std::path::Path; +use std::process::{exit, Command, Stdio}; + +use libc::{ + gethostname, posix_spawn_file_actions_adddup2, posix_spawn_file_actions_init, posix_spawnp, + waitpid, _SC_HOST_NAME_MAX, +}; + +extern "C" { + static mut environ: *mut *mut libc::c_char; +} + +type Result = std::result::Result; + +fn copy_stream(src: &mut dyn Read, dst: &mut dyn Write) -> io::Result<()> { + let mut buf = vec![0; 65536]; + loop { + match src.read(&mut buf) { + Ok(0) => return Ok(()), + Ok(len) => dst.write_all(&buf[..len])?, + Err(e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => return Err(e), + } + } +} + +fn hostname() -> OsString { + let mut bytes = vec![0; _SC_HOST_NAME_MAX as usize + 2]; + + if unsafe { gethostname(bytes.as_mut_ptr(), bytes.len()) } == -1 { + panic!("gethostname failed: {}", std::io::Error::last_os_error()); + } + + if bytes[_SC_HOST_NAME_MAX as usize + 1] != 0 { + panic!("hostname longer than HOST_NAME_MAX"); + } + + let bytes: Vec<_> = bytes + .into_iter() + .take_while(|b| *b != 0) + .map(|b| b as u8) + .collect(); + OsString::from_vec(bytes) +} + +#[derive(Clone, Debug)] +struct Config<'a> { + from: &'a OsStr, + repo_path: &'a Path, + owner: &'a str, + repo: &'a str, + remote_ref: &'a OsStr, + recipients: &'a [&'a str], + token: &'a [u8], + verbose: bool, +} + +impl<'a> Config<'a> { + pub fn from(&self) -> &OsStr { + self.from + } + + pub fn repo_path(&self) -> &Path { + self.repo_path + } + + pub fn remote_ref(&self) -> &OsStr { + self.remote_ref + } + + pub fn owner(&self) -> &str { + self.owner + } + + pub fn repo(&self) -> &str { + self.repo + } +} + +struct Git { + /// Arguments like `--git-dir` that should be applied to every Git + /// command, regardless of subcommand. + global_args: Vec, +} + +impl> Git { + pub fn new(global_args: Vec) -> Self { + Self { global_args } + } + + pub fn git>(&self, subcommand_name: Sub) -> Command { + let mut command = Command::new("git"); + command.args(&self.global_args).arg(subcommand_name); + command + } + + fn git_print_user_info(&self, fd: &dyn AsRawFd, commit: &OsStr) -> Result<()> { + let mut file_actions = MaybeUninit::uninit(); + let r = unsafe { posix_spawn_file_actions_init(file_actions.as_mut_ptr()) }; + if r != 0 { + return Err(std::io::Error::from_raw_os_error(r))?; + } + let mut file_actions = unsafe { file_actions.assume_init() }; + + let r = unsafe { posix_spawn_file_actions_adddup2(&mut file_actions, fd.as_raw_fd(), 1) }; + if r != 0 { + return Err(std::io::Error::from_raw_os_error(r))?; + } + + let mut range = Vec::with_capacity(40 + 1); + range.extend_from_slice(commit.as_bytes()); + range.push(0); + + let global_args: Vec = self + .global_args + .iter() + .map(|arg| { + CString::new(arg.as_ref().as_bytes()).expect("git global_args contains '\0'") + }) + .collect(); + + let c_global_args: Vec<*const u8> = global_args + .iter() + .map(|arg| arg.as_ptr() as *const _) + .collect(); + + let mut argv: Vec<*const u8> = vec![b"git\0" as *const _]; + argv.extend_from_slice(&c_global_args); + argv.push(b"show\0" as *const _); + argv.push(b"--no-patch\0" as *const _); + argv.push(b"--format=Committer: %cn <%ce>\0" as *const _); + argv.push(range.as_ptr()); + argv.push(0 as *const _); + + let mut pid = MaybeUninit::uninit(); + let r = unsafe { + posix_spawnp( + pid.as_mut_ptr(), + argv[0] as *mut _, + &file_actions, + 0 as *mut _, + argv.as_mut_ptr() as *const _, + environ, + ) + }; + if r != 0 { + return Err(std::io::Error::from_raw_os_error(r))?; + } + let pid = unsafe { pid.assume_init() }; + + let mut wstatus = MaybeUninit::uninit(); + if unsafe { waitpid(pid, wstatus.as_mut_ptr(), 0) } == -1 { + return Err(std::io::Error::last_os_error())?; + } + let wstatus = unsafe { wstatus.assume_init() }; + + if (wstatus & 0x7f) != 0 { + return Err(err_msg("git show exited abnormally")); + } + + let status = (wstatus & 0xff00) >> 8; + if status != 0 { + return Err(format_err!("git show exited with status {}", status)); + } + + Ok(()) + } +} + +use graphql_client::GraphQLQuery; + +type GitObjectID = String; + +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "vendor/github_schema.graphql", + query_path = "src/commit_pr.graphql", + response_derives = "Debug" +)] +struct CommitPRQuery; + +#[derive(Debug, Deserialize)] +struct GitHubGraphQLResponse { + data: D, +} + +use commit_pr_query::{CommitPrQueryRepositoryObjectOn, ResponseData}; + +// TODO: lifetime? +struct Client { + endpoint: &'static str, + token: Vec, +} + +use serde::Serialize; + +impl Client { + fn new(token: Vec) -> Self { + Self { + endpoint: "https://api.github.com/graphql", + token, + } + } + + fn request(&self, query: &T) -> Result { + let mut authorization = b"bearer ".to_vec(); + authorization.extend_from_slice(&self.token); + + let response: GitHubGraphQLResponse = Request::post(self.endpoint) + .header("User-Agent", "alyssais") + .header("Authorization", authorization.as_slice()) + .body(serde_json::to_vec(query)?)? + .send()? + .json()?; + + Ok(response.data) + } +} + +struct Run<'a> { + config: &'a Config<'a>, + client: Option, + git: Git<&'a OsStr>, +} + +impl<'a> Run<'a> { + pub fn new(config: &'a Config) -> Self { + Self { + config, + client: None, + git: Git::new(vec![ + OsStr::new("--no-pager"), + OsStr::new("-C"), + config.repo_path().as_os_str(), + ]), + } + } + + fn git>(&self, subcommand_name: Sub) -> Command { + self.git.git(subcommand_name) + } + + fn refspec(&self) -> OsString { + "FETCH_HEAD".into() + } + + fn repo_url(&self) -> String { + format!( + "https://github.com/{}/{}", + self.config.owner(), + self.config.repo() + ) + } + + fn commit_url(&self, commit: &str) -> String { + format!("{}/commit/{}", self.repo_url(), commit) + } + + fn head(&self) -> Result { + let mut out = self + .git("rev-parse") + .arg(self.refspec()) + .stderr(Stdio::inherit()) + .output()? + .stdout; + out.pop(); // Remove trailing newline + Ok(OsString::from_vec(out)) + } + + fn cursor_ref(&self) -> OsString { + let mut cursor_ref: OsString = "pushmail/cursor/github.com/".into(); + cursor_ref.push(self.config.owner()); + cursor_ref.push("/"); + cursor_ref.push(self.config.repo()); + cursor_ref.push("/"); + cursor_ref.push(self.config.remote_ref()); + cursor_ref + } + + fn update_cursor(&self, commit: &str, force: bool) -> Result<()> { + match self + .git("branch") + .args(if force { &["-f"][..] } else { &[][..] }) + .arg("--end-of-options") + .arg(self.cursor_ref()) + .arg(commit) + .status()? + .success() + { + true => Ok(()), + false => Err(err_msg("git branch -f failed")), + } + } + + fn send_email(&self, commit: &str) -> Result<()> { + eprintln!("Sending mail for {}", commit); + + let mut from = OsString::from("--from="); + from.push(self.config.from()); + + let to: Vec<_> = self + .config + .recipients + .iter() + .map(|recipient| format!("--to={}", recipient)) + .collect(); + + let mut message_id_hdr = OsString::from("--add-header=Message-ID: <"); + message_id_hdr.push(commit); + message_id_hdr.push("@"); + message_id_hdr.push(hostname()); + message_id_hdr.push(">"); + + let mut format_patch = self + .git("format-patch") + .stdout(Stdio::piped()) + .arg("--stdout") + .arg(message_id_hdr) + .arg(from) + .args(to) + .arg("--end-of-options") + .arg(format!("{0}~..{0}", commit)) + .spawn()?; + + let mut sendmail = Command::new("sendmail") + .stdin(Stdio::piped()) + .args(self.config.recipients) + .spawn()?; + + let sendmail_in = sendmail.stdin.as_mut().unwrap(); + + let stdout = format_patch.stdout.as_mut().unwrap(); + let patch = BufReader::new(stdout).split(b'\n'); + + #[derive(Copy, Clone, Debug, Eq, PartialEq)] + enum PatchState { + Header, + MessageHeader, + Message, + Commentary, + Diff, + } + + use PatchState::*; + + let mut state = Header; + + for line in patch { + let line = line?; + + let new_state = match (state, line.as_slice()) { + (Header, b"") => MessageHeader, + (MessageHeader, b"") => Message, + (Message, b"---") => Commentary, + (Commentary, l) if l.starts_with(b"diff ") => Diff, // TODO: Rust 1.42 slice literal + _ => state, + }; + + match (state, new_state) { + (MessageHeader, Message) => { + self.git + .git_print_user_info(sendmail_in, OsStr::new(commit))?; + } + + (Commentary, Diff) => { + write!(sendmail_in, " {}\n\n", self.commit_url(commit))?; + } + + _ => {} + } + + sendmail_in.write_all(&line)?; + sendmail_in.write_all(b"\n")?; + + state = new_state; + } + + if !format_patch.wait()?.success() { + return Err(err_msg("git format-patch failed")); + } + + drop(sendmail_in); + + if !sendmail.wait()?.success() { + return Err(err_msg("sendmail failed")); + } + + Ok(()) + } + + fn commit_has_pr(&self, oid: String) -> Result { + if self.config.verbose { + eprintln!("Looking for PR for {}.", oid); + } + + let query = CommitPRQuery::build_query(commit_pr_query::Variables { + owner: self.config.owner().to_string(), + repo: self.config.repo().to_string(), + oid: Some(oid), + }); + + let response = self.client.as_ref().unwrap().request(&query)?; + + fn require(op: &Option) -> Result<&T> { + op.as_ref().ok_or_else(|| err_msg("missing json path")) + } + + let repository = require(&response.repository)?; + + let commit = match require(&repository.object)?.on { + CommitPrQueryRepositoryObjectOn::Commit(ref c) => c, + _ => return Err(err_msg("returned object is not a commit")), + }; + + let pull_requests = require(&commit.associated_pull_requests)?; + let nodes = require(&pull_requests.nodes)?; + + for node in nodes { + let node = require(node)?; + let base_repository = require(&node.base_repository)?; + + if base_repository.owner.login == self.config.owner() { + if self.config.verbose { + eprintln!("Found PR for {}.", query.variables.oid.unwrap()); + } + return Ok(true); + } + } + + if self.config.verbose { + eprintln!("No PR for {}.", query.variables.oid.unwrap()); + } + + Ok(false) + } + + fn run(&mut self) -> Result<()> { + self.git("fetch") + .arg(self.repo_url()) + .arg(self.config.remote_ref()) + .status()?; + + let start = self.cursor_ref(); + let end = self.head()?; + + let mut range = start; + range.push(".."); + range.push(end); + + if self.config.verbose { + eprintln!("Checking {}", range.to_string_lossy()); + } + + let mut log = self + .git("log") + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .arg("--reverse") + .arg("--first-parent") + .arg("--format=%H") + .arg("--end-of-options") + .arg(range) + .spawn()?; + + let log_out = log.stdout.as_mut().expect("git log stdout missing"); + let commits = BufReader::new(log_out).split(b'\n'); + + self.client = Some(Client::new(self.config.token.to_vec())); + + for commit in commits { + // The commit_has_pr() calls could be parallelized, but at the time of writing Rayon + // doesn't support turning a non-indexed parallel iterator into a sequential one, and + // messages need to be sent out sequentially. + + let commit = String::from_utf8(commit?)?; + let has_pr = self.commit_has_pr(commit.clone())?; + + self.update_cursor(&commit, true)?; + + if has_pr { + continue; + } + + self.send_email(&commit)?; + } + + if !log.wait()?.success() { + // If it failed due to the ref not existing, this is the first time we're running. + // So all we need to do is create that ref and exit, and then next run we'll start + // from there.p + let mut arg = self.cursor_ref().to_os_string(); + arg.push("^{commit}"); + if !self + .git("rev-parse") + .arg("--verify") + .arg("-q") + .arg(arg) + .status()? + .success() + { + self.update_cursor("FETCH_HEAD", false)?; + eprintln!("Set FETCH_HEAD as the starting point. Any direct pushes from this point on will"); + eprintln!("generate mail in subsequent pushmail runs."); + return Ok(()); + } + + // Even if stderr is missing, it's probably more appropriate to fail because of the + // failed command at this point. + if let Some(mut log_stderr) = log.stderr { + let _ = copy_stream(&mut log_stderr, &mut stderr()); // Already crashing. + } + + return Err(err_msg("git log failed")); + } + + Ok(()) + } +} + +fn main() { + let matches = clap_app!(pushmail => + (version: "0.1.0") + (author: "Alyssa Ross ") + (about: "Send notification emails when a GitHub repository is pushed to directly.") + (@arg from: -f --from +takes_value "Value for mail From header") + (@arg ref: -r --ref +takes_value "Remote git ref to monitor") + (@arg verbose: -v --verbose "Log more") + (@arg path: +required "Path to local checkout of watched repository") + (@arg repo: +required "GitHub repository to monitor (owner/repo)") + (@arg recipient: +required "Recipient for notification messages") + ) + .get_matches(); + + // Safe because we have ownership of this file descriptor. + let mut token = Vec::with_capacity(41); + stdin().read_to_end(&mut token).unwrap(); + if token.last() == Some(&b'\n') { + token.pop(); + } + + let mut full_repo = matches.value_of("repo").unwrap().splitn(2, '/'); + let owner = full_repo.next().expect("missing repo owner"); + let repo = full_repo.next().expect("missing repo name"); + + let config = Config { + from: matches.value_of_os("from").unwrap(), // TODO: allow omission + repo_path: Path::new(matches.value_of_os("path").unwrap()), + owner, + repo, + remote_ref: matches + .value_of_os("ref") + .unwrap_or_else(|| OsStr::new("HEAD")), + recipients: &[matches.value_of("recipient").unwrap()], + token: &token, + verbose: matches.is_present("verbose"), + }; + + if let Err(error) = Run::new(&config).run() { + eprintln!("{}", error); + exit(1); + } +} -- cgit 1.4.1