Quellcode durchsuchen

Refactor streaming; separate stream loop from device; introduce new Device interface

Vladimir Vivien vor 3 Jahren
Ursprung
Commit
d1c84f0f99

+ 9 - 8
examples/capture/capture.go

@@ -23,6 +23,11 @@ func main() {
 	}
 	defer device.Close()
 
+	fps, err := device.GetFrameRate()
+	if err != nil {
+		log.Fatalf("failed to get framerate: %s", err)
+	}
+
 	// helper function to search for format descriptions
 	findPreferredFmt := func(fmts []v4l2.FormatDescription, pixEncoding v4l2.FourCCType) *v4l2.FormatDescription {
 		for _, desc := range fmts {
@@ -90,21 +95,17 @@ func main() {
 	log.Printf("Pixel format set to [%s]", pixFmt)
 
 	// start stream
-	log.Println("Start capturing...")
-	if err := device.StartStream(3); err != nil {
-		log.Fatalf("failed to start stream: %s", err)
-	}
-
 	ctx, cancel := context.WithCancel(context.TODO())
-	frameChan, err := device.Capture(ctx, 15)
+	frameChan, err := device.StartStream(ctx)
 	if err != nil {
-		log.Fatal(err)
+		log.Fatalf("failed to stream: %s", err)
 	}
 
+
 	// process frames from capture channel
 	totalFrames := 10
 	count := 0
-	log.Println("Streaming frames from device...")
+	log.Printf("Capturing %d frames at %d fps...", totalFrames, fps)
 	for frame := range frameChan {
 		fileName := fmt.Sprintf("capture_%d.jpg", count)
 		file, err := os.Create(fileName)

+ 42 - 8
examples/device_info/devinfo.go

@@ -49,9 +49,18 @@ func main() {
 		log.Fatal(err)
 	}
 
-	if err := printCaptureParam(device); err != nil {
-		log.Fatal(err)
+	if device.Capability().IsVideoCaptureSupported() {
+		if err := printCaptureParam(device); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	if device.Capability().IsVideoOutputSupported() {
+		if err := printOutputParam(device); err != nil {
+			log.Fatal(err)
+		}
 	}
+
 }
 
 func listDevices() error {
@@ -236,23 +245,48 @@ func printCropInfo(dev *device.Device) error {
 func printCaptureParam(dev *device.Device) error {
 	params, err := dev.GetStreamParam()
 	if err != nil {
-		return fmt.Errorf("streaming capture param: %w", err)
+		return fmt.Errorf("stream capture param: %w", err)
 	}
-	fmt.Println("Streaming parameters for video capture:")
+	fmt.Println("Stream capture parameters:")
 
 	tpf := "not specified"
-	if params.Capability == v4l2.StreamParamTimePerFrame {
+	if params.Capture.Capability == v4l2.StreamParamTimePerFrame {
 		tpf = "time per frame"
 	}
 	fmt.Printf(template, "Capability", tpf)
 
 	hiqual := "not specified"
-	if params.CaptureMode == v4l2.StreamParamModeHighQuality {
+	if params.Capture.CaptureMode == v4l2.StreamParamModeHighQuality {
 		hiqual = "high quality"
 	}
 	fmt.Printf(template, "Capture mode", hiqual)
 
-	fmt.Printf(template, "Frames per second", fmt.Sprintf("%d/%d", params.TimePerFrame.Denominator, params.TimePerFrame.Numerator))
-	fmt.Printf(template, "Read buffers", fmt.Sprintf("%d", params.ReadBuffers))
+	fmt.Printf(template, "Frames per second", fmt.Sprintf("%d/%d", params.Capture.TimePerFrame.Denominator, params.Capture.TimePerFrame.Numerator))
+	fmt.Printf(template, "Read buffers", fmt.Sprintf("%d", params.Capture.ReadBuffers))
+	return nil
+}
+
+
+func printOutputParam(dev *device.Device) error {
+	params, err := dev.GetStreamParam()
+	if err != nil {
+		return fmt.Errorf("stream output param: %w", err)
+	}
+	fmt.Println("Stream output parameters:")
+
+	tpf := "not specified"
+	if params.Output.Capability == v4l2.StreamParamTimePerFrame {
+		tpf = "time per frame"
+	}
+	fmt.Printf(template, "Capability", tpf)
+
+	hiqual := "not specified"
+	if params.Output.CaptureMode == v4l2.StreamParamModeHighQuality {
+		hiqual = "high quality"
+	}
+	fmt.Printf(template, "Output mode", hiqual)
+
+	fmt.Printf(template, "Frames per second", fmt.Sprintf("%d/%d", params.Output.TimePerFrame.Denominator, params.Output.TimePerFrame.Numerator))
+	fmt.Printf(template, "Write buffers", fmt.Sprintf("%d", params.Output.WriteBuffers))
 	return nil
 }

+ 28 - 21
examples/format/devfmt.go

@@ -21,18 +21,6 @@ func main() {
 	flag.StringVar(&format, "f", format, "pixel format")
 	flag.Parse()
 
-	device, err := device.Open(devName)
-	if err != nil {
-		log.Fatalf("failed to open device: %s", err)
-	}
-	defer device.Close()
-
-	currFmt, err := device.GetPixFormat()
-	if err != nil {
-		log.Fatalf("unable to get format: %s", err)
-	}
-	log.Printf("Current format: %s", currFmt)
-
 	fmtEnc := v4l2.PixelFmtYUYV
 	switch strings.ToLower(format) {
 	case "mjpeg":
@@ -43,18 +31,37 @@ func main() {
 		fmtEnc = v4l2.PixelFmtYUYV
 	}
 
-	if err := device.SetPixFormat(v4l2.PixFormat{
-		Width: uint32(width),
-		Height: uint32(height),
-		PixelFormat: fmtEnc,
-		Field: v4l2.FieldNone,
-	}); err != nil {
-		log.Fatalf("failed to set format: %s", err)
+	device, err := device.Open(
+		devName,
+		device.WithPixFormat(v4l2.PixFormat{Width: uint32(width), Height: uint32(height), PixelFormat: fmtEnc, Field: v4l2.FieldNone}),
+		device.WithFPS(15),
+	)
+	if err != nil {
+		log.Fatalf("failed to open device: %s", err)
 	}
+	defer device.Close()
 
-	currFmt, err = device.GetPixFormat()
+	currFmt, err := device.GetPixFormat()
 	if err != nil {
 		log.Fatalf("unable to get format: %s", err)
 	}
-	log.Printf("Updated format: %s", currFmt)
+	log.Printf("Current format: %s", currFmt)
+
+	// FPS
+	fps, err := device.GetFrameRate()
+	if err != nil {
+		log.Fatalf("failed to get fps: %s", err)
+	}
+	log.Printf("current frame rate: %d fps", fps)
+	// update fps
+	if fps < 30 {
+		if err := device.SetFrameRate(30); err != nil{
+			log.Fatalf("failed to set frame rate: %s", err)
+		}
+	}
+	fps, err = device.GetFrameRate()
+	if err != nil {
+		log.Fatalf("failed to get fps: %s", err)
+	}
+	log.Printf("updated frame rate: %d fps", fps)
 }

+ 2 - 6
examples/webcam/webcam.go

@@ -136,6 +136,7 @@ func main() {
 	device, err := device.Open(devName,
 		device.WithIOType(v4l2.IOTypeMMAP),
 		device.WithPixFormat(v4l2.PixFormat{PixelFormat: getFormatType(format), Width: uint32(width), Height: uint32(height)}),
+		device.WithFPS(uint32(frameRate)),
 	)
 
 	if err != nil {
@@ -154,14 +155,9 @@ func main() {
 	log.Printf("Current format: %s", currFmt)
 	pixfmt = currFmt.PixelFormat
 
-	// Setup and start stream capture
-	if err := device.StartStream(2); err != nil {
-		log.Fatalf("unable to start stream: %s", err)
-	}
-
 	// start capture
 	ctx, cancel := context.WithCancel(context.TODO())
-	f, err := device.Capture(ctx, uint32(frameRate))
+	f, err := device.StartStream(ctx)
 	if err != nil {
 		log.Fatalf("stream capture: %s", err)
 	}

+ 2 - 0
v4l2/control.go

@@ -15,6 +15,7 @@ type Control struct {
 	Value uint32
 }
 
+// GetControl returns control value for specified ID
 func GetControl(fd uintptr, id uint32) (Control, error) {
 	var ctrl C.struct_v4l2_control
 	ctrl.id = C.uint(id)
@@ -29,6 +30,7 @@ func GetControl(fd uintptr, id uint32) (Control, error) {
 	}, nil
 }
 
+// SetControl applies control value for specified ID
 func SetControl(fd uintptr, id, value uint32) error {
 	var ctrl C.struct_v4l2_control
 	ctrl.id = C.uint(id)

+ 111 - 113
v4l2/device/device.go

@@ -6,28 +6,28 @@ import (
 	"os"
 	"reflect"
 	sys "syscall"
-	"time"
 
 	"github.com/vladimirvivien/go4vl/v4l2"
 )
 
 type Device struct {
-	path             string
-	file             *os.File
-	fd               uintptr
-	config           Config
-	bufType          v4l2.BufType
-	cap              v4l2.Capability
-	cropCap          v4l2.CropCapability
-	buffers          [][]byte
-	requestedBuf     v4l2.RequestBuffers
-	streaming        bool
+	path         string
+	file         *os.File
+	fd           uintptr
+	config       Config
+	bufType      v4l2.BufType
+	cap          v4l2.Capability
+	cropCap      v4l2.CropCapability
+	buffers      [][]byte
+	requestedBuf v4l2.RequestBuffers
+	streaming    bool
 }
 
 // Open creates opens the underlying device at specified path
 // and returns a *Device or an error if unable to open device.
 func Open(path string, options ...Option) (*Device, error) {
 	file, err := os.OpenFile(path, sys.O_RDWR|sys.O_NONBLOCK, 0644)
+	//file, err := os.OpenFile(path, sys.O_RDWR, 0644)
 	if err != nil {
 		return nil, fmt.Errorf("device open: %w", err)
 	}
@@ -78,12 +78,22 @@ func Open(path string, options ...Option) (*Device, error) {
 	dev.cropCap = cropCap
 
 	// set pix format
-	if reflect.ValueOf(dev.config.pixFormat).IsZero() {
-		pixFmt, err :=  v4l2.GetPixFormat(file.Fd())
-		if err != nil {
+	if !reflect.ValueOf(dev.config.pixFormat).IsZero() {
+		if err := dev.SetPixFormat(dev.config.pixFormat); err != nil {
 			fmt.Errorf("device open: %s: set format: %w", path, err)
 		}
-		dev.config.pixFormat = pixFmt
+	}
+
+	// set fps
+	if !reflect.ValueOf(dev.config.fps).IsZero() {
+		if err := dev.SetFrameRate(dev.config.fps); err != nil {
+			fmt.Errorf("device open: %s: set fps: %w", path, err)
+		}
+	}
+
+	// set preferred device buffer size
+	if reflect.ValueOf(dev.config.bufSize).IsZero() {
+		dev.config.bufSize = 2
 	}
 
 	return dev, nil
@@ -101,8 +111,8 @@ func (d *Device) Close() error {
 }
 
 // Name returns the device name (or path)
-func (d *Device) Name() uintptr {
-	return d.fd
+func (d *Device) Name() string {
+	return d.path
 }
 
 // FileDescriptor returns the file descriptor value for the device
@@ -110,11 +120,34 @@ func (d *Device) FileDescriptor() uintptr {
 	return d.fd
 }
 
+// Buffers returns the internal mapped buffers. This method should be
+// called after streaming has been started otherwise it may return nil.
+func (d *Device) Buffers() [][]byte {
+	return d.buffers
+}
+
 // Capability returns device capability info.
 func (d *Device) Capability() v4l2.Capability {
 	return d.cap
 }
 
+// BufferType this is a convenience method that returns the device mode (i.e. Capture, Output, etc)
+// Use method Capability for detail about the device.
+func (d *Device) BufferType() v4l2.BufType {
+	return d.bufType
+}
+
+// BufferCount returns configured number of buffers to be used during streaming.
+// If called after streaming start, this value could be updated by the driver.
+func (d *Device) BufferCount() v4l2.BufType {
+	return d.config.bufSize
+}
+
+// MemIOType returns the device memory input/output type (i.e. Memory mapped, DMA, user pointer, etc)
+func (d *Device) MemIOType() v4l2.IOType {
+	return d.config.ioType
+}
+
 // GetCropCapability returns cropping info for device
 func (d *Device) GetCropCapability() (v4l2.CropCapability, error) {
 	if !d.cap.IsVideoCaptureSupported() {
@@ -216,10 +249,42 @@ func (d *Device) SetStreamParam(param v4l2.StreamParam) error {
 	return v4l2.SetStreamParam(d.fd, d.bufType, param)
 }
 
-// SetCaptureFPS sets the video capture FPS value of the device
-func (d *Device) SetCaptureFPS(fps uint32) error {
-	capture := v4l2.CaptureParam{TimePerFrame: v4l2.Fract{Numerator: 1, Denominator: fps}}
-	return d.SetStreamParam(v4l2.StreamParam{Capture: capture})
+// SetFrameRate sets the FPS rate value of the device
+func (d *Device) SetFrameRate(fps uint32) error {
+	var param v4l2.StreamParam
+	switch {
+	case d.cap.IsVideoCaptureSupported():
+		param.Capture = v4l2.CaptureParam{TimePerFrame: v4l2.Fract{Numerator: 1, Denominator: fps}}
+	case d.cap.IsVideoOutputSupported():
+		param.Output = v4l2.OutputParam{TimePerFrame: v4l2.Fract{Numerator: 1, Denominator: fps}}
+	default:
+		return v4l2.ErrorUnsupportedFeature
+	}
+	if err := d.SetStreamParam(param); err != nil {
+		return fmt.Errorf("device: set fps: %w", err)
+	}
+	d.config.fps = fps
+	return nil
+}
+
+// GetFrameRate returns the FPS value for the device
+func (d *Device) GetFrameRate() (uint32, error) {
+	if reflect.ValueOf(d.config.fps).IsZero() {
+		param, err := d.GetStreamParam()
+		if err != nil {
+			return 0, fmt.Errorf("device: frame rate: %w", err)
+		}
+		switch {
+		case d.cap.IsVideoCaptureSupported():
+			d.config.fps = param.Capture.TimePerFrame.Denominator
+		case d.cap.IsVideoOutputSupported():
+			d.config.fps = param.Output.TimePerFrame.Denominator
+		default:
+			return 0, v4l2.ErrorUnsupportedFeature
+		}
+	}
+
+	return d.config.fps, nil
 }
 
 // GetMediaInfo returns info for a device that supports the Media API
@@ -227,123 +292,56 @@ func (d *Device) GetMediaInfo() (v4l2.MediaDeviceInfo, error) {
 	return v4l2.GetMediaDeviceInfo(d.fd)
 }
 
-func (d *Device) StartStream(buffSize uint32) error {
+func (d *Device) StartStream(ctx context.Context) (<-chan []byte, error) {
+	if ctx.Err() != nil {
+		return nil, ctx.Err()
+	}
+
+	if !d.cap.IsStreamingSupported() {
+		return nil, fmt.Errorf("device: start stream: %s", v4l2.ErrorUnsupportedFeature)
+	}
+
 	if d.streaming {
-		return nil
+		return nil, fmt.Errorf("device: stream already started")
 	}
 
 	// allocate device buffers
-	bufReq, err := v4l2.InitBuffers(d.fd, d.config.ioType, d.bufType, buffSize)
+	bufReq, err := v4l2.InitBuffers(d)
 	if err != nil {
-		return fmt.Errorf("device: start stream: %w", err)
+		return nil, fmt.Errorf("device: init buffers: %w", err)
 	}
+	d.config.bufSize = bufReq.Count // update with granted buf size
 	d.requestedBuf = bufReq
 
-	// for each device buff allocated, prepare local mapped buffer
-	bufCount := int(d.requestedBuf.Count)
-	d.buffers = make([][]byte, d.requestedBuf.Count)
-	for i := 0; i < bufCount; i++ {
-		buffer, err := v4l2.GetBuffer(d.fd, v4l2.IOTypeMMAP, d.bufType, uint32(i))
-		if err != nil {
-			return fmt.Errorf("device start stream: %w", err)
-		}
-
-		offset := buffer.Info.Offset
-		length := buffer.Length
-		mappedBuf, err := v4l2.MapMemoryBuffer(d.fd, int64(offset), int(length))
-		if err != nil {
-			return fmt.Errorf("device start stream: %w", err)
-		}
-		d.buffers[i] = mappedBuf
+	// for each allocated device buf, map into local space
+	if d.buffers, err = v4l2.MakeMappedBuffers(d); err != nil {
+		return nil, fmt.Errorf("device: make mapped buffers: %s", err)
 	}
 
 	// Initial enqueue of buffers for capture
-	for i := 0; i < bufCount; i++ {
-		_, err := v4l2.QueueBuffer(d.fd, uint32(i))
+	for i := 0; i < int(d.config.bufSize); i++ {
+		_, err := v4l2.QueueBuffer(d.fd, d.config.ioType, d.bufType, uint32(i))
 		if err != nil {
-			return fmt.Errorf("device start stream: %w", err)
+			return nil, fmt.Errorf("device: initial buffer queueing: %w", err)
 		}
 	}
 
-	// turn on device stream
-	if err := v4l2.StreamOn(d.fd); err != nil {
-		return fmt.Errorf("device start stream: %w", err)
+	dataChan, err := v4l2.StartStreamLoop(ctx, d)
+	if err != nil {
+		return nil, fmt.Errorf("device: start stream loop: %s", err)
 	}
 
 	d.streaming = true
 
-	return nil
-}
-
-// Capture captures video buffer from device and emit
-// each buffer on channel.
-func (d *Device) Capture(ctx context.Context, fps uint32) (<-chan []byte, error) {
-	if !d.streaming {
-		return nil, fmt.Errorf("device: capture: streaming not started")
-	}
-	if ctx == nil {
-		return nil, fmt.Errorf("device: context nil")
-	}
-
-	bufCount := int(d.requestedBuf.Count)
-	dataChan := make(chan []byte, bufCount)
-
-	if fps == 0 {
-		fps = 10
-	}
-
-	// delay duration based on frame per second
-	fpsDelay := time.Duration((float64(1) / float64(fps)) * float64(time.Second))
-
-	go func() {
-		defer close(dataChan)
-
-		// capture forever or until signaled to stop
-		for {
-			// capture bufCount frames
-			for i := 0; i < bufCount; i++ {
-				//TODO add better error-handling during capture, for now just panic
-				if err := v4l2.WaitForDeviceRead(d.fd, 2*time.Second); err != nil {
-					panic(fmt.Errorf("device: capture: %w", err).Error())
-				}
-
-				// dequeue the device buf
-				bufInfo, err := v4l2.DequeueBuffer(d.fd)
-				if err != nil {
-					panic(fmt.Errorf("device: capture: %w", err).Error())
-				}
-
-				// assert dequeued buffer is in proper range
-				if !(int(bufInfo.Index) < bufCount) {
-					panic(fmt.Errorf("device: capture: unexpected device buffer index: %d", bufInfo.Index).Error())
-				}
-
-				select {
-				case dataChan <- d.buffers[bufInfo.Index][:bufInfo.BytesUsed]:
-				case <-ctx.Done():
-					return
-				}
-				// enqueu used buffer, prepare for next read
-				if _, err := v4l2.QueueBuffer(d.fd, bufInfo.Index); err != nil {
-					panic(fmt.Errorf("device capture: %w", err).Error())
-				}
-
-				time.Sleep(fpsDelay)
-			}
-		}
-	}()
-
 	return dataChan, nil
 }
 
 func (d *Device) StopStream() error {
 	d.streaming = false
-	for i := 0; i < len(d.buffers); i++ {
-		if err := v4l2.UnmapMemoryBuffer(d.buffers[i]); err != nil {
-			return fmt.Errorf("device: stop stream: %w", err)
-		}
+	if err := v4l2.UnmapBuffers(d); err != nil {
+		return fmt.Errorf("device: stop stream: %s", err)
 	}
-	if err := v4l2.StreamOff(d.fd); err != nil {
+	if err := v4l2.StopStreamLoop(d); err != nil {
 		return fmt.Errorf("device: stop stream: %w", err)
 	}
 	return nil

+ 14 - 0
v4l2/device/device_config.go

@@ -7,6 +7,8 @@ import (
 type Config struct {
 	ioType v4l2.IOType
 	pixFormat v4l2.PixFormat
+	bufSize uint32
+	fps uint32
 }
 
 type Option func(*Config)
@@ -21,4 +23,16 @@ func WithPixFormat(pixFmt v4l2.PixFormat) Option {
 	return func(o *Config) {
 		o.pixFormat = pixFmt
 	}
+}
+
+func WithBufferSize(size uint32) Option {
+	return func(o *Config) {
+		o.bufSize = size
+	}
+}
+
+func WithFPS(fps uint32) Option {
+	return func(o *Config) {
+		o.fps = fps
+	}
 }

+ 10 - 6
v4l2/stream_param.go

@@ -73,13 +73,17 @@ func GetStreamParam(fd uintptr, bufType BufType) (StreamParam, error) {
 }
 
 func SetStreamParam(fd uintptr, bufType BufType, param StreamParam) error {
-	var v4l2Param C.struct_v4l2_streamparm
-	v4l2Param._type = C.uint(bufType)
-	*(*C.struct_v4l2_captureparm)(unsafe.Pointer(&v4l2Param.parm[0])) = *(*C.struct_v4l2_captureparm)(unsafe.Pointer(&param.Capture))
-	*(*C.struct_v4l2_outputparm)(unsafe.Pointer(uintptr(unsafe.Pointer(&v4l2Param.parm[0])) + unsafe.Sizeof(C.struct_v4l2_captureparam{}))) =
-		*(*C.struct_v4l2_outputparm)(unsafe.Pointer(&param.Output))
+	var v4l2Parm C.struct_v4l2_streamparm
+	v4l2Parm._type = C.uint(bufType)
+	if bufType == BufTypeVideoCapture {
+		*(*C.struct_v4l2_captureparm)(unsafe.Pointer(&v4l2Parm.parm[0])) = *(*C.struct_v4l2_captureparm)(unsafe.Pointer(&param.Capture))
+	}
+	if bufType == BufTypeVideoOutput {
+		*(*C.struct_v4l2_outputparm)(unsafe.Pointer(uintptr(unsafe.Pointer(&v4l2Parm.parm[0])) + unsafe.Sizeof(v4l2Parm.parm[0]))) =
+			*(*C.struct_v4l2_outputparm)(unsafe.Pointer(&param.Output))
+	}
 
-	if err := send(fd, C.VIDIOC_S_PARM, uintptr(unsafe.Pointer(&v4l2Param))); err != nil {
+	if err := send(fd, C.VIDIOC_S_PARM, uintptr(unsafe.Pointer(&v4l2Parm))); err != nil {
 		return fmt.Errorf("stream param: %w", err)
 	}
 

+ 73 - 43
v4l2/streaming.go

@@ -4,9 +4,7 @@ package v4l2
 import "C"
 
 import (
-	"errors"
 	"fmt"
-	"time"
 	"unsafe"
 
 	sys "golang.org/x/sys/unix"
@@ -125,9 +123,9 @@ type PlaneInfo struct {
 // StreamOn requests streaming to be turned on for
 // capture (or output) that uses memory map, user ptr, or DMA buffers.
 // https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/vidioc-streamon.html
-func StreamOn(fd uintptr) error {
-	bufType := BufTypeVideoCapture
-	if err := send(fd, C.VIDIOC_STREAMON, uintptr(unsafe.Pointer(&bufType))); err != nil {
+func StreamOn(dev Device) error {
+	bufType := dev.BufferType()
+	if err := send(dev.FileDescriptor(), C.VIDIOC_STREAMON, uintptr(unsafe.Pointer(&bufType))); err != nil {
 		return fmt.Errorf("stream on: %w", err)
 	}
 	return nil
@@ -136,9 +134,9 @@ func StreamOn(fd uintptr) error {
 // StreamOff requests streaming to be turned off for
 // capture (or output) that uses memory map, user ptr, or DMA buffers.
 // https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/vidioc-streamon.html
-func StreamOff(fd uintptr) error {
-	bufType := BufTypeVideoCapture
-	if err := send(fd, C.VIDIOC_STREAMOFF, uintptr(unsafe.Pointer(&bufType))); err != nil {
+func StreamOff(dev Device) error {
+	bufType := dev.BufferType()
+	if err := send(dev.FileDescriptor(), C.VIDIOC_STREAMOFF, uintptr(unsafe.Pointer(&bufType))); err != nil {
 		return fmt.Errorf("stream off: %w", err)
 	}
 	return nil
@@ -147,16 +145,16 @@ func StreamOff(fd uintptr) error {
 // InitBuffers sends buffer allocation request to initialize buffer IO
 // for video capture or video output when using either mem map, user pointer, or DMA buffers.
 // See https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/vidioc-reqbufs.html#vidioc-reqbufs
-func InitBuffers(fd uintptr, ioType IOType, bufType BufType, buffSize uint32) (RequestBuffers, error) {
-	if ioType != IOTypeMMAP && ioType != IOTypeDMABuf {
+func InitBuffers(dev Device) (RequestBuffers, error) {
+	if dev.MemIOType() != IOTypeMMAP && dev.MemIOType() != IOTypeDMABuf {
 		return RequestBuffers{}, fmt.Errorf("request buffers: %w", ErrorUnsupported)
 	}
 	var req C.struct_v4l2_requestbuffers
-	req.count = C.uint(buffSize)
-	req._type = C.uint(bufType)
-	req.memory = C.uint(ioType)
+	req.count = C.uint(dev.BufferCount())
+	req._type = C.uint(dev.BufferType())
+	req.memory = C.uint(dev.MemIOType())
 
-	if err := send(fd, C.VIDIOC_REQBUFS, uintptr(unsafe.Pointer(&req))); err != nil {
+	if err := send(dev.FileDescriptor(), C.VIDIOC_REQBUFS, uintptr(unsafe.Pointer(&req))); err != nil {
 		return RequestBuffers{}, fmt.Errorf("request buffers: %w", err)
 	}
 
@@ -165,13 +163,13 @@ func InitBuffers(fd uintptr, ioType IOType, bufType BufType, buffSize uint32) (R
 
 // GetBuffer retrieves buffer info for allocated buffers at provided index.
 // This call should take place after buffers are allocated with RequestBuffers (for mmap for instance).
-func GetBuffer(fd uintptr, ioType IOType, bufType BufType, index uint32) (Buffer, error) {
+func GetBuffer(dev Device, index uint32) (Buffer, error) {
 	var v4l2Buf C.struct_v4l2_buffer
-	v4l2Buf._type = C.uint(bufType)
-	v4l2Buf.memory = C.uint(ioType)
+	v4l2Buf._type = C.uint(dev.BufferType())
+	v4l2Buf.memory = C.uint(dev.MemIOType())
 	v4l2Buf.index = C.uint(index)
 
-	if err := send(fd, C.VIDIOC_QUERYBUF, uintptr(unsafe.Pointer(&v4l2Buf))); err != nil {
+	if err := send(dev.FileDescriptor(), C.VIDIOC_QUERYBUF, uintptr(unsafe.Pointer(&v4l2Buf))); err != nil {
 		return Buffer{}, fmt.Errorf("query buffer: %w", err)
 	}
 
@@ -187,6 +185,27 @@ func MapMemoryBuffer(fd uintptr, offset int64, len int) ([]byte, error) {
 	return data, nil
 }
 
+// MakeMappedBuffers creates mapped memory buffers for specified buffer count of device.
+func MakeMappedBuffers(dev Device)([][]byte, error) {
+	bufCount := int(dev.BufferCount())
+	buffers := make([][]byte, bufCount)
+	for i := 0; i < bufCount; i++ {
+		buffer, err := GetBuffer(dev, uint32(i))
+		if err != nil {
+			return nil, fmt.Errorf("mapped buffers: %w", err)
+		}
+
+		offset := buffer.Info.Offset
+		length := buffer.Length
+		mappedBuf, err := MapMemoryBuffer(dev.FileDescriptor(), int64(offset), int(length))
+		if err != nil {
+			return nil, fmt.Errorf("mapped buffers: %w", err)
+		}
+		buffers[i] = mappedBuf
+	}
+	return buffers, nil
+}
+
 // UnmapMemoryBuffer removes the buffer that was previously mapped.
 func UnmapMemoryBuffer(buf []byte) error {
 	if err := sys.Munmap(buf); err != nil {
@@ -195,14 +214,27 @@ func UnmapMemoryBuffer(buf []byte) error {
 	return nil
 }
 
+// UnmapBuffers unmaps all mapped memory buffer for device
+func UnmapBuffers(dev Device) error {
+	if dev.Buffers() == nil {
+		return fmt.Errorf("unmap buffers: uninitialized buffers")
+	}
+	for i := 0; i < len(dev.Buffers()); i++ {
+		if err := UnmapMemoryBuffer(dev.Buffers()[i]); err != nil {
+			return fmt.Errorf("unmap buffers: %w", err)
+		}
+	}
+	return nil
+}
+
 // QueueBuffer enqueues a buffer in the device driver (as empty for capturing, or filled for video output)
 // when using either memory map, user pointer, or DMA buffers. Buffer is returned with
 // additional information about the queued buffer.
 // https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/vidioc-qbuf.html#vidioc-qbuf
-func QueueBuffer(fd uintptr, index uint32) (Buffer, error) {
+func QueueBuffer(fd uintptr, ioType IOType, bufType BufType, index uint32) (Buffer, error) {
 	var v4l2Buf C.struct_v4l2_buffer
-	v4l2Buf._type = C.uint(BufTypeVideoCapture)
-	v4l2Buf.memory = C.uint(IOTypeMMAP)
+	v4l2Buf._type = C.uint(bufType)
+	v4l2Buf.memory = C.uint(ioType)
 	v4l2Buf.index = C.uint(index)
 
 	if err := send(fd, C.VIDIOC_QBUF, uintptr(unsafe.Pointer(&v4l2Buf))); err != nil {
@@ -216,10 +248,10 @@ func QueueBuffer(fd uintptr, index uint32) (Buffer, error) {
 // when using either memory map, user pointer, or DMA buffers. Buffer is returned with
 // additional information about the dequeued buffer.
 // https://www.kernel.org/doc/html/latest/userspace-api/media/v4l/vidioc-qbuf.html#vidioc-qbuf
-func DequeueBuffer(fd uintptr) (Buffer, error) {
+func DequeueBuffer(fd uintptr, ioType IOType, bufType BufType) (Buffer, error) {
 	var v4l2Buf C.struct_v4l2_buffer
-	v4l2Buf._type = C.uint(BufTypeVideoCapture)
-	v4l2Buf.memory = C.uint(IOTypeMMAP)
+	v4l2Buf._type = C.uint(bufType)
+	v4l2Buf.memory = C.uint(ioType)
 
 	if err := send(fd, C.VIDIOC_DQBUF, uintptr(unsafe.Pointer(&v4l2Buf))); err != nil {
 		return Buffer{}, fmt.Errorf("buffer dequeue: %w", err)
@@ -229,24 +261,22 @@ func DequeueBuffer(fd uintptr) (Buffer, error) {
 	return makeBuffer(v4l2Buf), nil
 }
 
-// WaitForDeviceRead blocks until the specified device is
-// ready to be read or has timedout.
-func WaitForDeviceRead(fd uintptr, timeout time.Duration) error {
-	timeval := sys.NsecToTimeval(timeout.Nanoseconds())
-	var fdsRead sys.FdSet
-	fdsRead.Set(int(fd))
-	for {
-		n, err := sys.Select(int(fd+1), &fdsRead, nil, nil, &timeval)
-		switch n {
-		case -1:
-			if err == sys.EINTR {
-				continue
-			}
-			return err
-		case 0:
-			return errors.New("wait for device ready: timeout")
-		default:
-			return nil
-		}
+// CaptureFrame captures a frame buffer from the device
+func CaptureFrame(dev Device) ([]byte, error) {
+	bufInfo, err := DequeueBuffer(dev.FileDescriptor(), dev.MemIOType(), dev.BufferType())
+	if err != nil {
+		return nil, fmt.Errorf("capture frame: dequeue: %w", err)
+	}
+	// assert dequeued buffer is in proper range
+	if !(bufInfo.Index < dev.BufferCount()) {
+		return nil, fmt.Errorf("capture frame: buffer with unexpected index: %d (out of %d)", bufInfo.Index, dev.BufferCount())
 	}
+
+	// requeue/clear used buffer, prepare for next read
+	if _, err := QueueBuffer(dev.FileDescriptor(), dev.MemIOType(), dev.BufferType(), bufInfo.Index); err != nil {
+		return nil, fmt.Errorf("capture frame: queue: %w", err)
+	}
+
+	// return captured buffer
+	return dev.Buffers()[bufInfo.Index][:bufInfo.BytesUsed], nil
 }

+ 76 - 0
v4l2/streaming_loop.go

@@ -0,0 +1,76 @@
+package v4l2
+
+import (
+	"context"
+	"fmt"
+
+	sys "golang.org/x/sys/unix"
+)
+
+// StartStreamLoop issue a streaming request for the device and sets up
+// a loop to capture incoming buffers from the device.
+func StartStreamLoop(ctx context.Context, dev Device) (chan []byte, error) {
+	if err := StreamOn(dev); err != nil {
+		return nil, fmt.Errorf("stream loop: driver stream on: %w", err)
+	}
+
+	dataChan := make(chan []byte, dev.BufferCount())
+
+	go func() {
+		defer close(dataChan)
+		for {
+			select {
+			case <-WaitForRead(dev):
+				//TODO add better error-handling, for now just panic
+				frame, err  := CaptureFrame(dev)
+				if err != nil {
+					panic(fmt.Errorf("stream loop: frame capture: %s", err).Error())
+				}
+				select {
+				case dataChan <-frame:
+				case <-ctx.Done():
+					return
+				}
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
+
+	return dataChan, nil
+}
+
+// StopStreamLoop unmaps allocated IO memory and signal device to stop streaming
+func StopStreamLoop(dev Device) error {
+	if dev.Buffers() == nil {
+		return fmt.Errorf("stop loop: failed to stop loop: buffers uninitialized")
+	}
+
+	if err := StreamOff(dev); err != nil {
+		return fmt.Errorf("stop loop: stream off: %w", err)
+	}
+	return nil
+}
+
+func WaitForRead(dev Device) <-chan struct{} {
+	sigChan := make(chan struct{})
+
+	fd := dev.FileDescriptor()
+
+	go func() {
+		defer close(sigChan)
+		var fdsRead sys.FdSet
+		fdsRead.Set(int(fd))
+		for {
+			n, err := sys.Select(int(fd+1), &fdsRead, nil, nil, nil)
+			if n == -1 {
+				if err == sys.EINTR {
+					continue
+				}
+			}
+			sigChan <- struct{}{}
+		}
+	}()
+
+	return sigChan
+}

+ 11 - 0
v4l2/types.go

@@ -0,0 +1,11 @@
+package v4l2
+
+type Device interface {
+	Name() string
+	FileDescriptor() uintptr
+	Capability() Capability
+	Buffers() [][]byte
+	BufferType() BufType
+	BufferCount() uint32
+	MemIOType() IOType
+}