Concurrent Portfolio Image Pipelines

14 Jun 2016

A couple weeks ago, I had the chance to take the simple portfolio-images tool I quickly built on Friday, and improve it. As I mentioned at the end of Portfolio Image Pipelines, I updated the command to utilize concurrent pipelines. It was a good exercise to rewrite the main loop to utilize pools of workers completing tasks concurrently.

Most of the update was fairly straightforward. As I had already decided earlier, errors on any one image do not stop the overall operation, but are logged for the sake of the user. Errors are passed onto the error channel to be logged.

errc := make(chan error)

go func(ec <-chan error) {
	for err := range ec {
		log.Printf("[WARNING]: %v", err)
	}
}(errc)

The start of the pipeline generates a channel to receive each filename to open. This allows each iteration of that original loop from the old implementation to run concurrently.


	inputFileCh := QueueImages(done, inputFiles...)


func QueueImages(done <-chan struct{}, filenames ...string) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for _, n := range filenames {
			select {
			case out <- n:
			case <-done:
				return
			}
		}
	}()
	return out
}

With all of our pipeline steps, we include a done channel to allow the goroutines to stop when there is no more work to do.

The next several steps of the pipeline are similar enough to only need to show one. These middle steps all create a pool of workers to pull from the incoming channel and push to the outgoing channel.


	abortAllSizes := func() { bar.Add(len(imageSizes)) }
	inputImageCh := make(chan *ImageInput)
	var inputWg sync.WaitGroup
	inputWg.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go func() {
			OpenImages(done, abortCh(abortAllSizes), inputFileCh, inputImageCh)
			inputWg.Done()
		}()
	}
	go func() {
		inputWg.Wait()
		close(inputImageCh)
	}()


func OpenImages(done <-chan struct{}, errc chan<- error, filenames <-chan string, imgc chan<- *ImageInput) {
	for n := range filenames {
		// Because we can't send on a nil channel, only assign these when we have something to send
		var send chan<- *ImageInput
		var ec chan<- error

		ii, err := NewImageInput(n)
		if err != nil {
			ec = errc
		} else {
			send = imgc
		}

		select {
		case send <- ii:
		case ec <- err:
		case <-done:
			return
		}
	}
}

This simple pattern for pipelines is incredibly simple and powerful. I think it is also a very clean and easy way to think about a process. After a couple more process steps, ReadyImages and ResizeImages, the final step, SaveImages is only slightly different. Instead of waiting to close in a goroutine, this final step blocks until the operation is complete.


	// Step Four: Save Image (end of pipeline)
	var savedWg sync.WaitGroup
	savedWg.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go func() {
			SaveImages(done, errc, resizedImageCh, successCh, imageQuality)
			savedWg.Done()
		}()
	}
	savedWg.Wait()
	close(done)
	close(errc)

Overall, the process of making this tool operate concurrently was easy and effective. Now – assuming the workerCount is set sufficiently high – images that require more processing to complete do not create a bottleneck.

Future Improvements

As with any project, there are some things I would like to improve when I swing back around to this again. If you spot something beyond these basic few, please open an issue or make the change and send me a pull request.

comments powered by Disqus