浏览代码

Refactoring stream capture pipeline

Fix buggy behavior introduced when using Go stdlib's os.OpenFile which caused some devices to report busy
Add more aggressive ioctl error handling for v4l2 calls including open, QBUF, DQBUFF, etc
Testing using Raspberry Pi's HD camera modules directly
Documentation updates
Vladimir Vivien 3 年之前
父节点
当前提交
4a9080ecba
共有 9 个文件被更改,包括 655 次插入122 次删除
  1. 52 53
      device/device.go
  2. 1 1
      examples/capture0/capture0.go
  3. 30 0
      examples/ccapture/README.md
  4. 434 0
      examples/ccapture/capture.c
  5. 3 0
      v4l2/errors.go
  6. 0 34
      v4l2/ioctl.go
  7. 25 6
      v4l2/streaming.go
  8. 0 28
      v4l2/streaming_loop.go
  9. 110 0
      v4l2/syscalls.go

+ 52 - 53
device/device.go

@@ -2,6 +2,7 @@ package device
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"os"
 	"reflect"
@@ -27,7 +28,12 @@ type Device struct {
 // Open creates opens the underlying device at specified path for streaming.
 // It returns a *Device or an error if unable to open device.
 func Open(path string, options ...Option) (*Device, error) {
-	dev := &Device{path: path, config: config{}}
+	fd, err := v4l2.OpenDevice(path, sys.O_RDWR|sys.O_NONBLOCK, 0)
+	if err != nil {
+		return nil, fmt.Errorf("device open: %w", err)
+	}
+
+	dev := &Device{path: path, config: config{}, fd: fd}
 	// apply options
 	if len(options) > 0 {
 		for _, o := range options {
@@ -35,18 +41,10 @@ func Open(path string, options ...Option) (*Device, error) {
 		}
 	}
 
-	file, err := os.OpenFile(path, sys.O_RDWR|sys.O_NONBLOCK, 0644)
-	//file, err := os.OpenFile(path, sys.O_RDWR, 0644)
-	if err != nil {
-		return nil, fmt.Errorf("device open: %w", err)
-	}
-	dev.file = file
-	dev.fd = file.Fd()
-
 	// get capability
-	cap, err := v4l2.GetCapability(file.Fd())
+	cap, err := v4l2.GetCapability(dev.fd)
 	if err != nil {
-		if err := file.Close(); err != nil {
+		if err := v4l2.CloseDevice(dev.fd); err != nil {
 			return nil, fmt.Errorf("device %s: closing after failure: %s", path, err)
 		}
 		return nil, fmt.Errorf("device open: %s: %w", path, err)
@@ -60,7 +58,7 @@ func Open(path string, options ...Option) (*Device, error) {
 
 	// only supports streaming IO model right now
 	if !dev.cap.IsStreamingSupported() {
-		return nil, fmt.Errorf("device open: only streaming IO is supported")
+		return nil, fmt.Errorf("device open: device does not support streamingIO")
 	}
 
 	switch {
@@ -71,7 +69,7 @@ func Open(path string, options ...Option) (*Device, error) {
 	case cap.IsVideoOutputSupported():
 		dev.bufType = v4l2.BufTypeVideoOutput
 	default:
-		if err := file.Close(); err != nil {
+		if err := v4l2.CloseDevice(dev.fd); err != nil {
 			return nil, fmt.Errorf("device open: %s: closing after failure: %s", path, err)
 		}
 		return nil, fmt.Errorf("device open: %s: %w", path, v4l2.ErrorUnsupportedFeature)
@@ -81,30 +79,36 @@ func Open(path string, options ...Option) (*Device, error) {
 		return nil, fmt.Errorf("device open: does not support buffer stream type")
 	}
 
-	// ensures IOType is set
-	if reflect.ValueOf(dev.config.ioType).IsZero() {
-		dev.config.ioType = v4l2.IOTypeMMAP
+	// ensures IOType is set, only MemMap supported now
+	dev.config.ioType = v4l2.IOTypeMMAP
+
+	// reset crop, only if cropping supported
+	if cropcap, err := v4l2.GetCropCapability(dev.fd, dev.bufType); err == nil {
+		if err := v4l2.SetCropRect(dev.fd, cropcap.DefaultRect); err != nil {
+			// ignore errors
+		}
 	}
 
 	// set pix format
 	if !reflect.ValueOf(dev.config.pixFormat).IsZero() {
 		if err := dev.SetPixFormat(dev.config.pixFormat); err != nil {
-			fmt.Errorf("device open: %s: set format: %w", path, err)
+			return nil, fmt.Errorf("device open: %s: set format: %w", path, err)
 		}
 	} else {
-		if dev.config.pixFormat, err = v4l2.GetPixFormat(dev.fd); err != nil {
-			fmt.Errorf("device open: %s: get pix format: %w", path, err)
+		dev.config.pixFormat, err = v4l2.GetPixFormat(dev.fd)
+		if err != nil {
+			return nil, fmt.Errorf("device open: %s: get default format: %w", path, err)
 		}
 	}
 
 	// set fps
 	if !reflect.ValueOf(dev.config.fps).IsZero() {
 		if err := dev.SetFrameRate(dev.config.fps); err != nil {
-			fmt.Errorf("device open: %s: set fps: %w", path, err)
+			return nil, fmt.Errorf("device open: %s: set fps: %w", path, err)
 		}
 	} else {
 		if dev.config.fps, err = dev.GetFrameRate(); err != nil {
-			fmt.Errorf("device open: %s: get fps: %w", path, err)
+			return nil, fmt.Errorf("device open: %s: get fps: %w", path, err)
 		}
 	}
 
@@ -118,8 +122,7 @@ func (d *Device) Close() error {
 			return err
 		}
 	}
-
-	return d.file.Close()
+	return v4l2.CloseDevice(d.fd)
 }
 
 // Name returns the device name (or path)
@@ -336,9 +339,12 @@ func (d *Device) Start(ctx context.Context) error {
 	// allocate device buffers
 	bufReq, err := v4l2.InitBuffers(d)
 	if err != nil {
-		return fmt.Errorf("device: init buffers: %w", err)
+		return fmt.Errorf("device: requested buffer type not be supported: %w", err)
 	}
-	d.config.bufSize = bufReq.Count // update with granted buf size
+	if bufReq.Count < 2 {
+		return fmt.Errorf("device: %s: issuficient buffer memory", d.path)
+	}
+	d.config.bufSize = bufReq.Count
 	d.requestedBuf = bufReq
 
 	// for each allocated device buf, map into local space
@@ -346,14 +352,6 @@ func (d *Device) Start(ctx context.Context) error {
 		return fmt.Errorf("device: make mapped buffers: %s", err)
 	}
 
-	// Initial enqueue of buffers for capture
-	for i := 0; i < int(d.config.bufSize); i++ {
-		_, err := v4l2.QueueBuffer(d.fd, d.config.ioType, d.bufType, uint32(i))
-		if err != nil {
-			return fmt.Errorf("device: initial buffer queueing: %w", err)
-		}
-	}
-
 	if err := d.startStreamLoop(ctx); err != nil {
 		return fmt.Errorf("device: start stream loop: %s", err)
 	}
@@ -377,9 +375,20 @@ func (d *Device) Stop() error {
 	return nil
 }
 
+// startStreamLoop sets up the loop to run until context is cancelled, and returns immediately
+// and report any errors. The loop runs in a separate goroutine and uses the sys.Select to trigger
+// capture events.
 func (d *Device) startStreamLoop(ctx context.Context) error {
+	// Initial enqueue of buffers for capture
+	for i := 0; i < int(d.config.bufSize); i++ {
+		_, err := v4l2.QueueBuffer(d.fd, d.config.ioType, d.bufType, uint32(i))
+		if err != nil {
+			return fmt.Errorf("device: buffer queueing: %w", err)
+		}
+	}
+
 	if err := v4l2.StreamOn(d); err != nil {
-		return fmt.Errorf("stream loop: stream on: %w", err)
+		return fmt.Errorf("device: stream on: %w", err)
 	}
 
 	go func() {
@@ -394,10 +403,16 @@ func (d *Device) startStreamLoop(ctx context.Context) error {
 			select {
 			// handle stream capture (read from driver)
 			case <-waitForRead:
-				//TODO add better error-handling, for now just panic
-				buff, err := d.prepareCaptureBuffer(fd, ioMemType, bufType)
+				buff, err := v4l2.DequeueBuffer(fd, ioMemType, bufType)
 				if err != nil {
-					panic(fmt.Errorf("stream loop: capture buffer: %s", err).Error())
+					if errors.Is(err, sys.EAGAIN) {
+						continue
+					}
+					panic(fmt.Sprintf("device: stream loop dequeue: %s", err))
+				}
+
+				if _, err := v4l2.QueueBuffer(fd, ioMemType, bufType, buff.Index); err != nil {
+					panic(fmt.Sprintf("device: stream loop queue: %s: buff: %#v", err, buff))
 				}
 
 				d.output <- d.Buffers()[buff.Index][:buff.BytesUsed]
@@ -411,19 +426,3 @@ func (d *Device) startStreamLoop(ctx context.Context) error {
 
 	return nil
 }
-
-// prepareCaptureBuffer prepares a frame buffer for stream capture
-func (d *Device) prepareCaptureBuffer(fd uintptr, ioType v4l2.IOType, bufType v4l2.BufType) (v4l2.Buffer, error) {
-	bufInfo, err := v4l2.DequeueBuffer(fd, ioType, bufType)
-	if err != nil {
-		return v4l2.Buffer{}, fmt.Errorf("capture buffer info: dequeue: %w", err)
-	}
-
-	// requeue/clear used buffer, prepare for next read
-	if _, err := v4l2.QueueBuffer(fd, ioType, bufType, bufInfo.Index); err != nil {
-		return v4l2.Buffer{}, fmt.Errorf("capture buffer info: queue: %w", err)
-	}
-
-	// return captured buffer info
-	return bufInfo, nil
-}

+ 1 - 1
examples/capture0/capture0.go

@@ -19,7 +19,7 @@ func main() {
 	// open device
 	device, err := device.Open(
 		devName,
-		device.WithPixFormat(v4l2.PixFormat{PixelFormat: v4l2.PixelFmtMPEG, Width: 640, Height: 480}),
+		device.WithPixFormat(v4l2.PixFormat{PixelFormat: v4l2.PixelFmtMJPEG, Width: 640, Height: 480, Field: v4l2.FieldInterlaced}),
 	)
 	if err != nil {
 		log.Fatalf("failed to open device: %s", err)

+ 30 - 0
examples/ccapture/README.md

@@ -0,0 +1,30 @@
+# V4L2 video capture example in C
+
+This an example in C showing a minimally required steps to capture video using V4L2. This is can be used to run tests on devices and compare results with the Go4VL code.
+
+## Build and run
+On a Linux machine, run the following:
+
+```
+gcc -o capture capture.c
+```
+
+Run the program using:
+
+```
+./capture
+```
+
+Or, run `--help` to see available flags:
+
+```
+./capture --help
+```
+
+## Debugging with `strace`
+
+To view the ioctl calls made when running the capture program:
+
+```
+strace -o trace.log -e trace=ioctl  ./capture
+```

+ 434 - 0
examples/ccapture/capture.c

@@ -0,0 +1,434 @@
+/*
+ *  V4L2 video capture example
+ *  Used to validate result from test devices.
+ *  Based on https://git.linuxtv.org/v4l-utils.git/
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+#include <getopt.h>             /* getopt_long() */
+
+#include <fcntl.h>              /* low-level i/o */
+#include <unistd.h>
+#include <errno.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <sys/mman.h>
+#include <sys/ioctl.h>
+
+#include <linux/videodev2.h>
+
+#define CLEAR(x) memset(&(x), 0, sizeof(x))
+
+enum io_method {
+	IO_METHOD_MMAP,
+};
+
+struct buffer {
+	void   *start;
+	size_t  length;
+};
+
+static char            *dev_name;
+static enum io_method   io = IO_METHOD_MMAP;
+static int              fd = -1;
+struct buffer          *buffers;
+static unsigned int     n_buffers;
+static int		out_buf;
+static int              force_format;
+static int              frame_count = 70;
+
+static void errno_exit(const char *s)
+{
+	fprintf(stderr, "%s error %d, %s\n", s, errno, strerror(errno));
+	exit(EXIT_FAILURE);
+}
+
+static int xioctl(int fh, unsigned long int request, void *arg)
+{
+	int r;
+
+	do {
+		r = ioctl(fh, request, arg);
+	} while (-1 == r && EINTR == errno);
+
+	return r;
+}
+
+static void process_image(const void *p, int size)
+{
+	if (out_buf)
+		fwrite(p, size, 1, stdout);
+
+	fflush(stderr);
+	fprintf(stderr, ".");
+	fflush(stdout);
+}
+
+static int read_frame(void)
+{
+	struct v4l2_buffer buf;
+	unsigned int i;
+
+		CLEAR(buf);
+
+		buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+		buf.memory = V4L2_MEMORY_MMAP;
+
+		if (-1 == xioctl(fd, VIDIOC_DQBUF, &buf)) {
+			switch (errno) {
+			case EAGAIN:
+				return 0;
+
+			case EIO:
+				/* Could ignore EIO, see spec. */
+
+				/* fall through */
+
+			default:
+				errno_exit("VIDIOC_DQBUF");
+			}
+		}
+
+		assert(buf.index < n_buffers);
+
+		process_image(buffers[buf.index].start, buf.bytesused);
+
+		if (-1 == xioctl(fd, VIDIOC_QBUF, &buf))
+			errno_exit("VIDIOC_QBUF");
+
+	return 1;
+}
+
+static void mainloop(void)
+{
+	unsigned int count;
+
+	count = frame_count;
+
+	while (count-- > 0) {
+		for (;;) {
+			fd_set fds;
+			struct timeval tv;
+			int r;
+
+			FD_ZERO(&fds);
+			FD_SET(fd, &fds);
+
+			/* Timeout. */
+			tv.tv_sec = 2;
+			tv.tv_usec = 0;
+
+			r = select(fd + 1, &fds, NULL, NULL, &tv);
+
+			if (-1 == r) {
+				if (EINTR == errno)
+					continue;
+				errno_exit("select");
+			}
+
+			if (0 == r) {
+				fprintf(stderr, "select timeout\n");
+				exit(EXIT_FAILURE);
+			}
+
+			if (read_frame())
+				break;
+			/* EAGAIN - continue select loop. */
+		}
+	}
+}
+
+static void stop_capturing(void)
+{
+	enum v4l2_buf_type type;
+
+	type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+	if (-1 == xioctl(fd, VIDIOC_STREAMOFF, &type))
+		errno_exit("VIDIOC_STREAMOFF");
+
+}
+
+static void start_capturing(void)
+{
+	unsigned int i;
+	enum v4l2_buf_type type;
+
+
+            for (i = 0; i < n_buffers; ++i) {
+                struct v4l2_buffer buf;
+
+                CLEAR(buf);
+                buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+                buf.memory = V4L2_MEMORY_MMAP;
+                buf.index = i;
+
+                if (-1 == xioctl(fd, VIDIOC_QBUF, &buf))
+                    errno_exit("VIDIOC_QBUF");
+            }
+		type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+		if (-1 == xioctl(fd, VIDIOC_STREAMON, &type))
+			errno_exit("VIDIOC_STREAMON");
+
+}
+
+static void uninit_device(void)
+{
+	unsigned int i;
+	for (i = 0; i < n_buffers; ++i)
+		if (-1 == munmap(buffers[i].start, buffers[i].length))
+			errno_exit("munmap");
+
+	free(buffers);
+}
+
+static void init_mmap(void)
+{
+	struct v4l2_requestbuffers req;
+
+	CLEAR(req);
+
+	req.count = 4;
+	req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+	req.memory = V4L2_MEMORY_MMAP;
+
+	if (-1 == xioctl(fd, VIDIOC_REQBUFS, &req)) {
+		if (EINVAL == errno) {
+			fprintf(stderr, "%s does not support "
+				 "memory mapping\n", dev_name);
+			exit(EXIT_FAILURE);
+		} else {
+			errno_exit("VIDIOC_REQBUFS");
+		}
+	}
+
+	if (req.count < 2) {
+		fprintf(stderr, "Insufficient buffer memory on %s\n",
+			 dev_name);
+		exit(EXIT_FAILURE);
+	}
+
+	buffers = calloc(req.count, sizeof(*buffers));
+
+	if (!buffers) {
+		fprintf(stderr, "Out of memory\n");
+		exit(EXIT_FAILURE);
+	}
+
+	for (n_buffers = 0; n_buffers < req.count; ++n_buffers) {
+		struct v4l2_buffer buf;
+
+		CLEAR(buf);
+
+		buf.type        = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+		buf.memory      = V4L2_MEMORY_MMAP;
+		buf.index       = n_buffers;
+
+		if (-1 == xioctl(fd, VIDIOC_QUERYBUF, &buf))
+			errno_exit("VIDIOC_QUERYBUF");
+
+		buffers[n_buffers].length = buf.length;
+		buffers[n_buffers].start =
+			mmap(NULL /* start anywhere */,
+			      buf.length,
+			      PROT_READ | PROT_WRITE /* required */,
+			      MAP_SHARED /* recommended */,
+			      fd, buf.m.offset);
+
+		if (MAP_FAILED == buffers[n_buffers].start)
+			errno_exit("mmap");
+	}
+}
+
+static void init_device(void)
+{
+	struct v4l2_capability cap;
+	struct v4l2_cropcap cropcap;
+	struct v4l2_crop crop;
+	struct v4l2_format fmt;
+
+	if (-1 == xioctl(fd, VIDIOC_QUERYCAP, &cap)) {
+		if (EINVAL == errno) {
+			fprintf(stderr, "%s is no V4L2 device\n",
+				 dev_name);
+			exit(EXIT_FAILURE);
+		} else {
+			errno_exit("VIDIOC_QUERYCAP");
+		}
+	}
+
+	if (!(cap.capabilities & V4L2_CAP_VIDEO_CAPTURE)) {
+		fprintf(stderr, "%s is no video capture device\n",
+			 dev_name);
+		exit(EXIT_FAILURE);
+	}
+
+	if (!(cap.capabilities & V4L2_CAP_STREAMING)) {
+		fprintf(stderr, "%s does not support streaming i/o\n", dev_name);
+		exit(EXIT_FAILURE);
+	}
+
+
+	/* Select video input, video standard and tune here. */
+	CLEAR(cropcap);
+
+	cropcap.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+
+	if (0 == xioctl(fd, VIDIOC_CROPCAP, &cropcap)) {
+		crop.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+		crop.c = cropcap.defrect; /* reset to default */
+
+		if (-1 == xioctl(fd, VIDIOC_S_CROP, &crop)) {
+			switch (errno) {
+			case EINVAL:
+				/* Cropping not supported. */
+				break;
+			default:
+				/* Errors ignored. */
+				break;
+			}
+		}
+	} else {
+		/* Errors ignored. */
+	}
+
+
+	CLEAR(fmt);
+
+	fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
+	if (force_format) {
+		fmt.fmt.pix.width       = 640;
+		fmt.fmt.pix.height      = 480;
+		fmt.fmt.pix.pixelformat = V4L2_PIX_FMT_YUYV;
+		fmt.fmt.pix.field       = V4L2_FIELD_INTERLACED;
+
+		if (-1 == xioctl(fd, VIDIOC_S_FMT, &fmt))
+			errno_exit("VIDIOC_S_FMT");
+
+		/* Note VIDIOC_S_FMT may change width and height. */
+	} else {
+		/* Preserve original settings as set by v4l2-ctl for example */
+		if (-1 == xioctl(fd, VIDIOC_G_FMT, &fmt))
+			errno_exit("VIDIOC_G_FMT");
+	}
+
+	init_mmap();
+}
+
+static void close_device(void)
+{
+	if (-1 == close(fd))
+		errno_exit("close");
+
+	fd = -1;
+}
+
+static void open_device(void)
+{
+	struct stat st;
+
+	if (-1 == stat(dev_name, &st)) {
+		fprintf(stderr, "Cannot identify '%s': %d, %s\n",
+			 dev_name, errno, strerror(errno));
+		exit(EXIT_FAILURE);
+	}
+
+	if (!S_ISCHR(st.st_mode)) {
+		fprintf(stderr, "%s is no device\n", dev_name);
+		exit(EXIT_FAILURE);
+	}
+
+	fd = open(dev_name, O_RDWR /* required */ | O_NONBLOCK, 0);
+
+	if (-1 == fd) {
+		fprintf(stderr, "Cannot open '%s': %d, %s\n",
+			 dev_name, errno, strerror(errno));
+		exit(EXIT_FAILURE);
+	}
+}
+
+static void usage(FILE *fp, int argc, char **argv)
+{
+	fprintf(fp,
+		 "Usage: %s [options]\n\n"
+		 "Version 1.3\n"
+		 "Options:\n"
+		 "-d | --device name   Video device name [%s]\n"
+		 "-h | --help          Print this message\n"
+		 "-o | --output        Outputs stream to stdout\n"
+		 "-f | --format        Force format to 640x480 YUYV\n"
+		 "-c | --count         Number of frames to grab [%i]\n"
+		 "",
+		 argv[0], dev_name, frame_count);
+}
+
+static const char short_options[] = "d:hmruofc:";
+
+static const struct option
+long_options[] = {
+	{ "device", required_argument, NULL, 'd' },
+	{ "help",   no_argument,       NULL, 'h' },
+	{ "mmap",   no_argument,       NULL, 'm' },
+	{ "output", no_argument,       NULL, 'o' },
+	{ "format", no_argument,       NULL, 'f' },
+	{ "count",  required_argument, NULL, 'c' },
+	{ 0, 0, 0, 0 }
+};
+
+int main(int argc, char **argv)
+{
+	dev_name = "/dev/video0";
+
+	for (;;) {
+		int idx;
+		int c;
+
+		c = getopt_long(argc, argv,
+				short_options, long_options, &idx);
+
+		if (-1 == c)
+			break;
+
+		switch (c) {
+		case 0: /* getopt_long() flag */
+			break;
+
+		case 'd':
+			dev_name = optarg;
+			break;
+
+		case 'h':
+			usage(stdout, argc, argv);
+			exit(EXIT_SUCCESS);
+
+		case 'o':
+			out_buf++;
+			break;
+
+		case 'c':
+			errno = 0;
+			frame_count = strtol(optarg, NULL, 0);
+			if (errno)
+				errno_exit(optarg);
+			break;
+
+		default:
+			usage(stderr, argc, argv);
+			exit(EXIT_FAILURE);
+		}
+	}
+
+	open_device();
+	init_device();
+	start_capturing();
+	mainloop();
+	stop_capturing();
+	uninit_device();
+	close_device();
+	fprintf(stderr, "\n");
+	return 0;
+}

+ 3 - 0
v4l2/errors.go

@@ -12,12 +12,15 @@ var (
 	ErrorTimeout            = errors.New("timeout error")
 	ErrorUnsupported        = errors.New("unsupported error")
 	ErrorUnsupportedFeature = errors.New("feature unsupported error")
+	ErrorInterrupted        = errors.New("interrupted")
 )
 
 func parseErrorType(errno sys.Errno) error {
 	switch errno {
 	case sys.EBADF, sys.ENOMEM, sys.ENODEV, sys.EIO, sys.ENXIO, sys.EFAULT: // structural, terminal
 		return ErrorSystem
+	case sys.EINTR:
+		return ErrorInterrupted
 	case sys.EINVAL: // bad argument
 		return ErrorBadArgument
 	case sys.ENOTTY: // unsupported

+ 0 - 34
v4l2/ioctl.go

@@ -1,34 +0,0 @@
-package v4l2
-
-import (
-	sys "golang.org/x/sys/unix"
-)
-
-// ioctl is a wrapper for Syscall(SYS_IOCTL)
-func ioctl(fd, req, arg uintptr) (err sys.Errno) {
-	if _, _, errno := sys.Syscall(sys.SYS_IOCTL, fd, req, arg); errno != 0 {
-		if errno != 0 {
-			err = errno
-			return
-		}
-	}
-	return 0
-}
-
-// send sends a request to the kernel (via ioctl syscall)
-func send(fd, req, arg uintptr) error {
-	errno := ioctl(fd, req, arg)
-	if errno == 0 {
-		return nil
-	}
-	parsedErr := parseErrorType(errno)
-	switch parsedErr {
-	case ErrorUnsupported, ErrorSystem, ErrorBadArgument:
-		return parsedErr
-	case ErrorTimeout, ErrorTemporary:
-		// TODO add code for automatic retry/recovery
-		return errno
-	default:
-		return errno
-	}
-}

+ 25 - 6
v4l2/streaming.go

@@ -140,7 +140,7 @@ func StreamOff(dev StreamingDevice) error {
 	return nil
 }
 
-// InitBuffers sends buffer allocation request to initialize buffer IO
+// InitBuffers sends buffer allocation request (VIDIOC_REQBUFS) to initialize buffer IO
 // for video capture or video output when using either mem map, user pointer, or DMA buffers.
 // See https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/vidioc-reqbufs.html#vidioc-reqbufs
 func InitBuffers(dev StreamingDevice) (RequestBuffers, error) {
@@ -153,7 +153,26 @@ func InitBuffers(dev StreamingDevice) (RequestBuffers, error) {
 	req.memory = C.uint(dev.MemIOType())
 
 	if err := send(dev.Fd(), C.VIDIOC_REQBUFS, uintptr(unsafe.Pointer(&req))); err != nil {
-		return RequestBuffers{}, fmt.Errorf("request buffers: %w", err)
+		return RequestBuffers{}, fmt.Errorf("request buffers: %w: type not supported", err)
+	}
+
+	return *(*RequestBuffers)(unsafe.Pointer(&req)), nil
+}
+
+// ResetBuffers allocates a buffer of size 0 VIDIOC_REQBUFS(0) to free (or orphan) all
+// buffers. Useful when shuttingdown the stream.
+// See https://linuxtv.org/downloads/v4l-dvb-apis-new/userspace-api/v4l/vidioc-reqbufs.html
+func ResetBuffers(dev StreamingDevice) (RequestBuffers, error) {
+	if dev.MemIOType() != IOTypeMMAP && dev.MemIOType() != IOTypeDMABuf {
+		return RequestBuffers{}, fmt.Errorf("reset buffers: %w", ErrorUnsupported)
+	}
+	var req C.struct_v4l2_requestbuffers
+	req.count = C.uint(0)
+	req._type = C.uint(dev.BufferType())
+	req.memory = C.uint(dev.MemIOType())
+
+	if err := send(dev.Fd(), C.VIDIOC_REQBUFS, uintptr(unsafe.Pointer(&req))); err != nil {
+		return RequestBuffers{}, fmt.Errorf("reset buffers VIDIOC_REQBUFS(0): %w", err)
 	}
 
 	return *(*RequestBuffers)(unsafe.Pointer(&req)), nil
@@ -168,7 +187,7 @@ func GetBuffer(dev StreamingDevice, index uint32) (Buffer, error) {
 	v4l2Buf.index = C.uint(index)
 
 	if err := send(dev.Fd(), C.VIDIOC_QUERYBUF, uintptr(unsafe.Pointer(&v4l2Buf))); err != nil {
-		return Buffer{}, fmt.Errorf("query buffer: %w", err)
+		return Buffer{}, fmt.Errorf("query buffer: type not supported: %w", err)
 	}
 
 	return makeBuffer(v4l2Buf), nil
@@ -253,9 +272,9 @@ func DequeueBuffer(fd uintptr, ioType IOType, bufType BufType) (Buffer, error) {
 	v4l2Buf._type = C.uint(bufType)
 	v4l2Buf.memory = C.uint(ioType)
 
-	if err := send(fd, C.VIDIOC_DQBUF, uintptr(unsafe.Pointer(&v4l2Buf))); err != nil {
-		return Buffer{}, fmt.Errorf("buffer dequeue: %w", err)
-
+	err := send(fd, C.VIDIOC_DQBUF, uintptr(unsafe.Pointer(&v4l2Buf)))
+	if err != nil {
+		return Buffer{}, fmt.Errorf("buffer dequeue: EGAIN: %w", err)
 	}
 
 	return makeBuffer(v4l2Buf), nil

+ 0 - 28
v4l2/streaming_loop.go

@@ -1,28 +0,0 @@
-package v4l2
-
-import (
-	sys "golang.org/x/sys/unix"
-)
-
-// WaitForRead returns a channel that can be used to be notified when
-// a device's is ready to be read.
-func WaitForRead(dev Device) <-chan struct{} {
-	sigChan := make(chan struct{})
-
-	go func(fd uintptr) {
-		defer close(sigChan)
-		var fdsRead sys.FdSet
-		fdsRead.Set(int(fd))
-		for {
-			n, err := sys.Select(int(fd+1), &fdsRead, nil, nil, nil)
-			if n == -1 {
-				if err == sys.EINTR {
-					continue
-				}
-			}
-			sigChan <- struct{}{}
-		}
-	}(dev.Fd())
-
-	return sigChan
-}

+ 110 - 0
v4l2/syscalls.go

@@ -0,0 +1,110 @@
+package v4l2
+
+import (
+	"errors"
+	"fmt"
+	"io/fs"
+	"os"
+
+	sys "golang.org/x/sys/unix"
+)
+
+// OpenDevice offers a simpler file-open operation than the Go API's os.OpenFile  (the Go API's
+// operation causes some drivers to return busy). It also applies file validation prior to opening the device.
+func OpenDevice(path string, flags int, mode uint32) (uintptr, error) {
+	fstat, err := os.Stat(path)
+	if err != nil {
+		return 0, fmt.Errorf("open device: %w", err)
+	}
+
+	if (fstat.Mode() | fs.ModeCharDevice) == 0 {
+		return 0, fmt.Errorf("device open: %s: not character device", path)
+	}
+
+	return openDev(path, flags, mode)
+}
+
+// openDev offers a simpler file open operation than the Go API OpenFile.
+// See https://cs.opensource.google/go/go/+/refs/tags/go1.19.1:src/os/file_unix.go;l=205
+func openDev(path string, flags int, mode uint32) (uintptr, error) {
+	var fd int
+	var err error
+	for {
+		fd, err = sys.Openat(sys.AT_FDCWD, path, flags, mode)
+		if err == nil {
+			break
+		}
+
+		if errors.Is(err, ErrorInterrupted) {
+			continue //retry
+		}
+
+		return 0, &os.PathError{Op: "open", Path: path, Err: err}
+	}
+	return uintptr(fd), nil
+}
+
+// CloseDevice closes the device.
+func CloseDevice(fd uintptr) error {
+	return closeDev(fd)
+}
+
+func closeDev(fd uintptr) error {
+	return sys.Close(int(fd))
+}
+
+// ioctl is a wrapper for Syscall(SYS_IOCTL)
+func ioctl(fd, req, arg uintptr) (err sys.Errno) {
+	for {
+		_, _, errno := sys.Syscall(sys.SYS_IOCTL, fd, req, arg)
+		switch errno {
+		case 0:
+			return 0
+		case sys.EINTR:
+			continue // retry
+		default:
+			return errno
+		}
+	}
+}
+
+// send sends a request to the kernel (via ioctl syscall)
+func send(fd, req, arg uintptr) error {
+	errno := ioctl(fd, req, arg)
+	if errno == 0 {
+		return nil
+	}
+	parsedErr := parseErrorType(errno)
+	switch parsedErr {
+	case ErrorUnsupported, ErrorSystem, ErrorBadArgument:
+		return parsedErr
+	case ErrorTimeout, ErrorTemporary:
+		// TODO add code for automatic retry/recovery
+		return errno
+	default:
+		return errno
+	}
+}
+
+// WaitForRead returns a channel that can be used to be notified when
+// a device's is ready to be read.
+func WaitForRead(dev Device) <-chan struct{} {
+	sigChan := make(chan struct{})
+
+	go func(fd uintptr) {
+		defer close(sigChan)
+		var fdsRead sys.FdSet
+		fdsRead.Set(int(fd))
+		tv := sys.Timeval{Sec: 2, Usec: 0}
+		for {
+			_, errno := sys.Select(int(fd+1), &fdsRead, nil, nil, &tv)
+			if errno == sys.EINTR {
+				continue
+			}
+
+			sigChan <- struct{}{}
+		}
+	}(dev.Fd())
+
+	return sigChan
+}