Created
May 8, 2026 15:09
-
-
Save josnidhin/1048df48646d86aff3e82ce6e2ba52d3 to your computer and use it in GitHub Desktop.
An implementation of capacity-aware synchronization primitive that I named ThresholdBarrier. It can be used to answer the question "Is there enough capacity to fetch another batch for processing?" eg: fetching from SQS only when there is capacity to process.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * @author Jose Nidhin | |
| */ | |
| package main | |
| import ( | |
| "context" | |
| "fmt" | |
| "math/rand" | |
| "sync" | |
| "time" | |
| ) | |
| type ThresholdBarrier struct { | |
| sem chan struct{} | |
| slots int | |
| threshold int | |
| mu sync.Mutex | |
| slotsUsed int | |
| notifyCh chan struct{} | |
| } | |
| // NewThresholdBarrier create a ThresholdBarrier with valid initial state. | |
| func NewThresholdBarrier(slots, threshold int) *ThresholdBarrier { | |
| if threshold > slots { | |
| threshold = slots | |
| } | |
| if threshold <= 0 { | |
| threshold = 1 | |
| } | |
| return &ThresholdBarrier{ | |
| sem: make(chan struct{}, slots), | |
| slots: slots, | |
| threshold: threshold, | |
| notifyCh: make(chan struct{}, 1), | |
| } | |
| } | |
| // Enter is called the worker wants to enter the ThresholdBarrier. It occupies a | |
| // slot in the ThresholdBarrier. | |
| func (tb *ThresholdBarrier) Enter(ctx context.Context) error { | |
| select { | |
| case tb.sem <- struct{}{}: | |
| tb.mu.Lock() | |
| tb.slotsUsed++ | |
| tb.mu.Unlock() | |
| return nil | |
| case <-ctx.Done(): | |
| return ctx.Err() | |
| } | |
| } | |
| func (tb *ThresholdBarrier) Done() { | |
| select { | |
| case <-tb.sem: | |
| default: | |
| return | |
| } | |
| tb.mu.Lock() | |
| tb.slotsUsed-- | |
| free := tb.slots - tb.slotsUsed | |
| tb.mu.Unlock() | |
| if free >= tb.threshold { | |
| select { | |
| case tb.notifyCh <- struct{}{}: | |
| default: | |
| } | |
| } | |
| } | |
| func (tb *ThresholdBarrier) Wait(ctx context.Context) error { | |
| tb.mu.Lock() | |
| free := tb.slots - tb.slotsUsed | |
| tb.mu.Unlock() | |
| if free >= tb.threshold { | |
| return nil | |
| } | |
| select { | |
| case <-tb.notifyCh: | |
| return nil | |
| case <-ctx.Done(): | |
| return ctx.Err() | |
| } | |
| } | |
| func Fetch(count int) []int { | |
| var result []int | |
| for i := 0; i < count; i++ { | |
| result = append(result, rand.Int()) | |
| } | |
| return result | |
| } | |
| func main() { | |
| const slots = 6 | |
| const threshold = 2 | |
| ctx := context.Background() | |
| tb := NewThresholdBarrier(slots, threshold) | |
| fetchCount := 0 | |
| var wg sync.WaitGroup | |
| for { | |
| if fetchCount >= 12 { | |
| wg.Wait() | |
| return | |
| } | |
| fmt.Printf("--- Fetch Batch: %d ---\n", threshold) | |
| vals := Fetch(threshold) | |
| fetchCount++ | |
| for _, v := range vals { | |
| wg.Go(func() { | |
| err := tb.Enter(ctx) | |
| if err != nil { | |
| fmt.Printf("%d Failed to enter: %v\n", v, err) | |
| return | |
| } | |
| defer tb.Done() | |
| fmt.Printf("Processing %d\n", v) | |
| time.Sleep(time.Duration(v%3+1) * time.Second) | |
| fmt.Printf("Finished %d\n", v) | |
| }) | |
| } | |
| err := tb.Wait(ctx) | |
| if err != nil { | |
| fmt.Printf("Failed to wait: %v\n", err) | |
| return | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment