// # dnf install golang-x-sys-devel // $ GO111MODULE=off GOPATH=/usr/share/gocode go run async-read.go // // Echoes standard input to standard output line by line, and terminates on // EOF or after 20 seconds. Works with FIFOs or named pipes, pseudo-terminals // and regular files. package main import ( "bufio" "encoding/binary" "errors" "fmt" "io" "os" "time" "golang.org/x/sys/unix" ) var ( ErrCanceled = errors.New("canceled") ErrCLOSED = errors.New("CLOSED") ErrHUP = errors.New("HUP") ) func scan(file *os.File, scanner *bufio.Scanner, doneFD int32) (string, error) { fileFD := int32(file.Fd()) pollFDs := []unix.PollFd{ { Fd: doneFD, Events: unix.POLLIN, Revents: 0, }, { Fd: fileFD, Events: unix.POLLIN, Revents: 0, }, } for { if _, err := unix.Poll(pollFDs, -1); err != nil { if errors.Is(err, unix.EINTR) { fmt.Fprintf(os.Stderr, "poll failed: %s: ignoring\n", err) continue } return "", fmt.Errorf("poll failed: %w", err) } fmt.Fprintf(os.Stderr, "poll revents: 0x%x, 0x%x\n", pollFDs[0].Revents, pollFDs[1].Revents) if pollFDs[0].Revents & unix.POLLIN != 0 { fmt.Fprintf(os.Stderr, "done FD fired\n") for { buffer := make([]byte, 8) if n, err := unix.Read(int(doneFD), buffer); n != len(buffer) || err != nil { break } } return "", ErrCanceled } if pollFDs[0].Revents & unix.POLLNVAL != 0 { fmt.Fprintf(os.Stderr, "done FD fired\n") return "", ErrCanceled } if pollFDs[1].Revents & unix.POLLIN != 0 { if !scanner.Scan() { fmt.Fprintf(os.Stderr, "scanner.Scan() failed\n") if err := scanner.Err(); err != nil { return "", err } else { return "", io.EOF } } text := scanner.Text() return text, nil } if pollFDs[1].Revents & unix.POLLHUP != 0 { return "", ErrHUP } if pollFDs[1].Revents & unix.POLLNVAL != 0 { return "", ErrCLOSED } } } func scanFileAsync(file *os.File, split bufio.SplitFunc, done <-chan struct{}) (<-chan string, <-chan error) { textCh := make(chan string) errCh := make(chan error) scanner := bufio.NewScanner(file) scanner.Split(split) doneFD, err := unix.Eventfd(0, unix.EFD_CLOEXEC | unix.EFD_NONBLOCK) if err != nil { textCh <- "" errCh <- fmt.Errorf("eventfd failed: %w", err) close(textCh) close(errCh) } go func() { defer close(textCh) defer close(errCh) for { text, err := scan(file, scanner, int32(doneFD)) fmt.Fprintf(os.Stderr, "scanned line\n") if err != nil { errCh <- err break } textCh <- text } fmt.Fprintf(os.Stderr, "scan finished\n") }() go func() { defer unix.Close(doneFD) select { case <-done: fmt.Fprintf(os.Stderr, "done channel closed\n") // A varint-encoded uint64 takes a maximum of 10 bytes, // as defined by binary.MaxVarintLen64. However, 1 // byte is enough to encode the number 1. See: // https://protobuf.dev/programming-guides/encoding/ // // An eventfd(2) file descriptor expects a uint64 to be // given to write(2). Luckily, a varint-encoded number // 1 happens to work. buffer := make([]byte, 8) binary.PutUvarint(buffer, 1) if _, err := unix.Write(doneFD, buffer); err != nil { panicMsg := fmt.Sprintf("write to eventfd failed: %s", err) panic(panicMsg) } } }() return textCh, errCh } func main() { done, doneClosed := make(chan struct{}), false defer func() { // To be sure that scanFileAsync terminates cleanly. time.Sleep(1 * time.Second) if !doneClosed { close(done) } }() timer := time.NewTimer(20 * time.Second) defer timer.Stop() textCh, errCh := scanFileAsync(os.Stdin, bufio.ScanLines, done) var errReceived bool for !errReceived { select { case val := <-textCh: fmt.Printf("> %s\n", val) case val := <-errCh: errReceived = true fmt.Printf("%s\n", val) case <-timer.C: fmt.Fprintf(os.Stderr, "timer fired\n"); close(done) doneClosed = true } } }