common/para: Add parallel task executor helper
authorBjørn Erik Pedersen <bjorn.erik.pedersen@gmail.com>
Thu, 21 Nov 2019 17:38:14 +0000 (18:38 +0100)
committerBjørn Erik Pedersen <bjorn.erik.pedersen@gmail.com>
Thu, 21 Nov 2019 17:38:14 +0000 (18:38 +0100)
Usage of this will come later.

common/para/para.go [new file with mode: 0644]
common/para/para_test.go [new file with mode: 0644]

diff --git a/common/para/para.go b/common/para/para.go
new file mode 100644 (file)
index 0000000..319bdb7
--- /dev/null
@@ -0,0 +1,73 @@
+// Copyright 2019 The Hugo Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package para implements parallel execution helpers.
+package para
+
+import (
+       "context"
+
+       "golang.org/x/sync/errgroup"
+)
+
+// Workers configures a task executor with the most number of tasks to be executed in parallel.
+type Workers struct {
+       sem chan struct{}
+}
+
+// Runner wraps the lifecycle methods of a new task set.
+//
+// Run wil block until a worker is available or the context is cancelled,
+// and then run the given func in a new goroutine.
+// Wait will wait for all the running goroutines to finish.
+type Runner interface {
+       Run(func() error)
+       Wait() error
+}
+
+type errGroupRunner struct {
+       *errgroup.Group
+       w *Workers
+       ctx  context.Context
+}
+
+func (g *errGroupRunner) Run(fn func() error) {
+       select {
+       case g.w.sem <- struct{}{}:
+       case <-g.ctx.Done():
+               return
+       }
+
+       g.Go(func() error {
+               err := fn()
+               <-g.w.sem
+               return err
+       })
+}
+
+// New creates a new Workers with the given number of workers.
+func New(numWorkers int) *Workers {
+       return &Workers{
+               sem: make(chan struct{}, numWorkers),
+       }
+}
+
+// Start starts a new Runner.
+func (w *Workers) Start(ctx context.Context) (Runner, context.Context) {
+       g, ctx := errgroup.WithContext(ctx)
+       return &errGroupRunner{
+               Group: g,
+               ctx:   ctx,
+               w:  w,
+       }, ctx
+}
diff --git a/common/para/para_test.go b/common/para/para_test.go
new file mode 100644 (file)
index 0000000..9f33a23
--- /dev/null
@@ -0,0 +1,85 @@
+// Copyright 2019 The Hugo Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package para
+
+import (
+       "context"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       qt "github.com/frankban/quicktest"
+)
+
+func TestPara(t *testing.T) {
+
+       c := qt.New(t)
+
+       c.Run("Order", func(c *qt.C) {
+               n := 500
+               ints := make([]int, n)
+               for i := 0; i < n; i++ {
+                       ints[i] = i
+               }
+
+               p := New(4)
+               r, _ := p.Start(context.Background())
+
+               var result []int
+               var mu sync.Mutex
+               for i := 0; i < n; i++ {
+                       i := i
+                       r.Run(func() error {
+                               mu.Lock()
+                               defer mu.Unlock()
+                               result = append(result, i)
+                               return nil
+                       })
+               }
+
+               c.Assert(r.Wait(), qt.IsNil)
+               c.Assert(result, qt.HasLen, len(ints))
+               c.Assert(sort.IntsAreSorted(result), qt.Equals, false, qt.Commentf("Para does not seem to be parallel"))
+               sort.Ints(result)
+               c.Assert(result, qt.DeepEquals, ints)
+
+       })
+
+       c.Run("Time", func(c *qt.C) {
+               const n = 100
+
+               p := New(5)
+               r, _ := p.Start(context.Background())
+
+               start := time.Now()
+
+               var counter int64
+
+               for i := 0; i < n; i++ {
+                       r.Run(func() error {
+                               atomic.AddInt64(&counter, 1)
+                               time.Sleep(1 * time.Millisecond)
+                               return nil
+                       })
+               }
+
+               c.Assert(r.Wait(), qt.IsNil)
+               c.Assert(counter, qt.Equals, int64(n))
+               c.Assert(time.Since(start) < n/2*time.Millisecond, qt.Equals, true)
+
+       })
+
+}