Skip to content

Instantly share code, notes, and snippets.

@josnidhin
Created May 8, 2026 15:09
Show Gist options
  • Select an option

  • Save josnidhin/1048df48646d86aff3e82ce6e2ba52d3 to your computer and use it in GitHub Desktop.

Select an option

Save josnidhin/1048df48646d86aff3e82ce6e2ba52d3 to your computer and use it in GitHub Desktop.
An implementation of capacity-aware synchronization primitive that I named ThresholdBarrier. It can be used to answer the question "Is there enough capacity to fetch another batch for processing?" eg: fetching from SQS only when there is capacity to process.
/**
* @author Jose Nidhin
*/
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type ThresholdBarrier struct {
sem chan struct{}
slots int
threshold int
mu sync.Mutex
slotsUsed int
notifyCh chan struct{}
}
// NewThresholdBarrier create a ThresholdBarrier with valid initial state.
func NewThresholdBarrier(slots, threshold int) *ThresholdBarrier {
if threshold > slots {
threshold = slots
}
if threshold <= 0 {
threshold = 1
}
return &ThresholdBarrier{
sem: make(chan struct{}, slots),
slots: slots,
threshold: threshold,
notifyCh: make(chan struct{}, 1),
}
}
// Enter is called the worker wants to enter the ThresholdBarrier. It occupies a
// slot in the ThresholdBarrier.
func (tb *ThresholdBarrier) Enter(ctx context.Context) error {
select {
case tb.sem <- struct{}{}:
tb.mu.Lock()
tb.slotsUsed++
tb.mu.Unlock()
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (tb *ThresholdBarrier) Done() {
select {
case <-tb.sem:
default:
return
}
tb.mu.Lock()
tb.slotsUsed--
free := tb.slots - tb.slotsUsed
tb.mu.Unlock()
if free >= tb.threshold {
select {
case tb.notifyCh <- struct{}{}:
default:
}
}
}
func (tb *ThresholdBarrier) Wait(ctx context.Context) error {
tb.mu.Lock()
free := tb.slots - tb.slotsUsed
tb.mu.Unlock()
if free >= tb.threshold {
return nil
}
select {
case <-tb.notifyCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func Fetch(count int) []int {
var result []int
for i := 0; i < count; i++ {
result = append(result, rand.Int())
}
return result
}
func main() {
const slots = 6
const threshold = 2
ctx := context.Background()
tb := NewThresholdBarrier(slots, threshold)
fetchCount := 0
var wg sync.WaitGroup
for {
if fetchCount >= 12 {
wg.Wait()
return
}
fmt.Printf("--- Fetch Batch: %d ---\n", threshold)
vals := Fetch(threshold)
fetchCount++
for _, v := range vals {
wg.Go(func() {
err := tb.Enter(ctx)
if err != nil {
fmt.Printf("%d Failed to enter: %v\n", v, err)
return
}
defer tb.Done()
fmt.Printf("Processing %d\n", v)
time.Sleep(time.Duration(v%3+1) * time.Second)
fmt.Printf("Finished %d\n", v)
})
}
err := tb.Wait(ctx)
if err != nil {
fmt.Printf("Failed to wait: %v\n", err)
return
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment