From 8111b12da5ebbc79539138c92e33a15efcc268af Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Wed, 22 Feb 2023 09:53:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=BC=82=E6=AD=A5=E5=87=BD?= =?UTF-8?q?=E6=95=B0=E6=89=A7=E8=A1=8C=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- concurrent/concurrent.go | 91 ++++ concurrent/dispatch.go | 195 +++++++++ concurrent/worker.go | 78 ++++ event/event.go | 3 +- event/eventpool.go | 24 ++ event/eventtype.go | 6 +- service/module.go | 4 + service/service.go | 41 +- util/queue/deque.go | 413 ++++++++++++++++++ util/queue/deque_test.go | 836 ++++++++++++++++++++++++++++++++++++ util/queue/priorityqueue.go | 7 + 11 files changed, 1674 insertions(+), 24 deletions(-) create mode 100644 concurrent/concurrent.go create mode 100644 concurrent/dispatch.go create mode 100644 concurrent/worker.go create mode 100644 event/eventpool.go create mode 100644 util/queue/deque.go create mode 100644 util/queue/deque_test.go diff --git a/concurrent/concurrent.go b/concurrent/concurrent.go new file mode 100644 index 0000000..70d9e89 --- /dev/null +++ b/concurrent/concurrent.go @@ -0,0 +1,91 @@ +package concurrent + +import ( + "errors" + "runtime" + + "github.com/duanhf2012/origin/log" +) + +const defaultMaxTaskChannelNum = 1000000 + +type IConcurrent interface { + OpenConcurrentByNumCPU(cpuMul float32) + OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32, maxTaskChannelNum int) + AsyncDoByQueue(queueId int64, fn func(), cb func(err error)) + AsyncDo(f func(), cb func(err error)) +} + +type Concurrent struct { + dispatch + + tasks chan task + cbChannel chan func(error) +} + +/* +cpuMul 表示cpu的倍数 +建议:(1)cpu密集型 使用1 (2)i/o密集型使用2或者更高 +*/ +func (c *Concurrent) OpenConcurrentByNumCPU(cpuNumMul float32) { + goroutineNum := int32(float32(runtime.NumCPU())*cpuNumMul + 1) + c.OpenConcurrent(goroutineNum, goroutineNum, defaultMaxTaskChannelNum) +} + +func (c *Concurrent) OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32, maxTaskChannelNum int) { + c.tasks = make(chan task, maxTaskChannelNum) + c.cbChannel = make(chan func(error), maxTaskChannelNum) + + //打开dispach + c.dispatch.open(minGoroutineNum, maxGoroutineNum, c.tasks, c.cbChannel) +} + +func (c *Concurrent) AsyncDo(f func(), cb func(err error)) { + c.AsyncDoByQueue(0, f, cb) +} + +func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)) { + if cap(c.tasks) == 0 { + panic("not open concurrent") + } + + if fn == nil && cb == nil { + log.SStack("fn and cb is nil") + return + } + + if len(c.tasks) > cap(c.tasks) { + log.SError("tasks channel is full") + if cb != nil { + c.pushAsyncDoCallbackEvent(func(err error) { + cb(errors.New("tasks channel is full")) + }) + } + return + } + + if fn == nil { + c.pushAsyncDoCallbackEvent(cb) + return + } + + select { + case c.tasks <- task{queueId, fn, cb}: + } +} + +func (c *Concurrent) Close() { + if cap(c.tasks) == 0 { + return + } + + log.SRelease("wait close concurrent") + + c.dispatch.close() + + log.SRelease("concurrent has successfully exited") +} + +func (c *Concurrent) GetCallBackChannel() chan func(error) { + return c.cbChannel +} diff --git a/concurrent/dispatch.go b/concurrent/dispatch.go new file mode 100644 index 0000000..6094e5b --- /dev/null +++ b/concurrent/dispatch.go @@ -0,0 +1,195 @@ +package concurrent + +import ( + "sync" + "sync/atomic" + "time" + + "fmt" + "runtime" + + "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/util/queue" +) + +var idleTimeout = 2 * time.Second + +type dispatch struct { + minConcurrentNum int32 + maxConcurrentNum int32 + + queueIdChannel chan int64 + workerQueue chan task + tasks chan task + idle bool + workerNum int32 + cbChannel chan func(error) + + mapTaskQueueSession map[int64]*queue.Deque[task] + + waitWorker sync.WaitGroup + waitDispatch sync.WaitGroup +} + +func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan task, cbChannel chan func(error)) { + d.minConcurrentNum = minGoroutineNum + d.maxConcurrentNum = maxGoroutineNum + d.tasks = tasks + d.mapTaskQueueSession = make(map[int64]*queue.Deque[task], 1024) + d.workerQueue = make(chan task) + d.cbChannel = cbChannel + d.queueIdChannel = make(chan int64, cap(tasks)) + + d.waitDispatch.Add(1) + go d.run() +} + +func (d *dispatch) run() { + defer d.waitDispatch.Done() + timeout := time.NewTimer(idleTimeout) + + for { + select { + case queueId := <-d.queueIdChannel: + d.processqueueEvent(queueId) + default: + select { + case t, ok := <-d.tasks: + if ok == false { + return + } + d.processTask(&t) + case queueId := <-d.queueIdChannel: + d.processqueueEvent(queueId) + case <-timeout.C: + d.processTimer() + if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 { + idleTimeout = time.Millisecond * 10 + } + timeout.Reset(idleTimeout) + } + } + + if atomic.LoadInt32(&d.minConcurrentNum) == -1 && d.workerNum == 0 { + d.waitWorker.Wait() + d.cbChannel <- nil + return + } + } +} + +func (d *dispatch) processTimer() { + if d.idle == true && d.workerNum > d.minConcurrentNum { + d.processIdle() + } + + d.idle = true +} + +func (d *dispatch) processqueueEvent(queueId int64) { + d.idle = false + + queueSession := d.mapTaskQueueSession[queueId] + if queueSession == nil { + return + } + + queueSession.PopFront() + if queueSession.Len() == 0 { + return + } + + t := queueSession.Front() + d.executeTask(&t) +} + +func (d *dispatch) executeTask(t *task) { + select { + case d.workerQueue <- *t: + return + default: + if d.workerNum < d.maxConcurrentNum { + var work worker + work.start(&d.waitWorker, t, d) + return + } + } + + d.workerQueue <- *t +} + +func (d *dispatch) processTask(t *task) { + d.idle = false + + //处理有排队任务 + if t.queueId != 0 { + queueSession := d.mapTaskQueueSession[t.queueId] + if queueSession == nil { + queueSession = &queue.Deque[task]{} + d.mapTaskQueueSession[t.queueId] = queueSession + } + + //没有正在执行的任务,则直接执行 + if queueSession.Len() == 0 { + d.executeTask(t) + } + + queueSession.PushBack(*t) + return + } + + //普通任务 + d.executeTask(t) +} + +func (d *dispatch) processIdle() { + select { + case d.workerQueue <- task{}: + d.workerNum-- + default: + } +} + +func (d *dispatch) pushQueueTaskFinishEvent(queueId int64) { + d.queueIdChannel <- queueId +} + +func (c *dispatch) pushAsyncDoCallbackEvent(cb func(err error)) { + if cb == nil { + //不需要回调的情况 + return + } + + c.cbChannel <- cb +} + +func (d *dispatch) close() { + atomic.StoreInt32(&d.minConcurrentNum, -1) + +breakFor: + for { + select { + case cb := <-d.cbChannel: + if cb == nil { + break breakFor + } + cb(nil) + } + } + + d.waitDispatch.Wait() +} + +func (d *dispatch) DoCallback(cb func(err error)) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + + log.SError("core dump info[", errString, "]\n", string(buf[:l])) + } + }() + + cb(nil) +} diff --git a/concurrent/worker.go b/concurrent/worker.go new file mode 100644 index 0000000..682943d --- /dev/null +++ b/concurrent/worker.go @@ -0,0 +1,78 @@ +package concurrent + +import ( + "sync" + + "errors" + "fmt" + "runtime" + + "github.com/duanhf2012/origin/log" +) + +type task struct { + queueId int64 + fn func() + cb func(err error) +} + +type worker struct { + *dispatch +} + +func (t *task) isExistTask() bool { + return t.fn == nil +} + +func (w *worker) start(waitGroup *sync.WaitGroup, t *task, d *dispatch) { + w.dispatch = d + d.workerNum += 1 + waitGroup.Add(1) + go w.run(waitGroup, *t) +} + +func (w *worker) run(waitGroup *sync.WaitGroup, t task) { + defer waitGroup.Done() + + w.exec(&t) + for { + select { + case tw := <-w.workerQueue: + if tw.isExistTask() { + //exit goroutine + log.SRelease("worker goroutine exit") + return + } + w.exec(&tw) + } + } +} + +func (w *worker) exec(t *task) { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + + cb := t.cb + t.cb = func(err error) { + cb(errors.New(errString)) + } + + w.endCallFun(t) + log.SError("core dump info[", errString, "]\n", string(buf[:l])) + } + }() + + t.fn() + w.endCallFun(t) +} + +func (w *worker) endCallFun(t *task) { + w.pushAsyncDoCallbackEvent(t.cb) + + if t.queueId != 0 { + w.pushQueueTaskFinishEvent(t.queueId) + } +} diff --git a/event/event.go b/event/event.go index 94c4582..5cf5890 100644 --- a/event/event.go +++ b/event/event.go @@ -7,7 +7,6 @@ import ( "sync" ) - //事件接受器 type EventCallBack func(event IEvent) @@ -229,7 +228,6 @@ func (processor *EventProcessor) EventHandler(ev IEvent) { } } - func (processor *EventProcessor) castEvent(event IEvent){ if processor.mapListenerEvent == nil { log.SError("mapListenerEvent not init!") @@ -246,3 +244,4 @@ func (processor *EventProcessor) castEvent(event IEvent){ proc.PushEvent(event) } } + diff --git a/event/eventpool.go b/event/eventpool.go new file mode 100644 index 0000000..7edd7a7 --- /dev/null +++ b/event/eventpool.go @@ -0,0 +1,24 @@ +package event + +import "github.com/duanhf2012/origin/util/sync" + +// eventPool的内存池,缓存Event +const defaultMaxEventChannelNum = 2000000 + +var eventPool = sync.NewPoolEx(make(chan sync.IPoolData, defaultMaxEventChannelNum), func() sync.IPoolData { + return &Event{} +}) + +func NewEvent() *Event{ + return eventPool.Get().(*Event) +} + +func DeleteEvent(event IEvent){ + eventPool.Put(event.(sync.IPoolData)) +} + +func SetEventPoolSize(eventPoolSize int){ + eventPool = sync.NewPoolEx(make(chan sync.IPoolData, eventPoolSize), func() sync.IPoolData { + return &Event{} + }) +} diff --git a/event/eventtype.go b/event/eventtype.go index daac65d..a0ff85e 100644 --- a/event/eventtype.go +++ b/event/eventtype.go @@ -12,7 +12,11 @@ const ( Sys_Event_WebSocket EventType = -5 Sys_Event_Node_Event EventType = -6 Sys_Event_DiscoverService EventType = -7 - + Sys_Event_DiscardGoroutine EventType = -8 + Sys_Event_QueueTaskFinish EventType = -9 + Sys_Event_User_Define EventType = 1 + + ) diff --git a/service/module.go b/service/module.go index 5df993e..bc4e502 100644 --- a/service/module.go +++ b/service/module.go @@ -10,11 +10,13 @@ import ( "github.com/duanhf2012/origin/log" rpcHandle "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/util/timer" + "github.com/duanhf2012/origin/concurrent" ) const InitModuleId = 1e9 type IModule interface { + concurrent.IConcurrent SetModuleId(moduleId uint32) bool GetModuleId() uint32 AddModule(module IModule) (uint32, error) @@ -56,6 +58,7 @@ type Module struct { //事件管道 eventHandler event.IEventHandler + concurrent.IConcurrent } func (m *Module) SetModuleId(moduleId uint32) bool { @@ -105,6 +108,7 @@ func (m *Module) AddModule(module IModule) (uint32, error) { pAddModule.moduleName = reflect.Indirect(reflect.ValueOf(module)).Type().Name() pAddModule.eventHandler = event.NewEventHandler() pAddModule.eventHandler.Init(m.eventHandler.GetEventProcessor()) + pAddModule.IConcurrent = m.IConcurrent err := module.OnInit() if err != nil { return 0, err diff --git a/service/service.go b/service/service.go index 12296e5..a91f0bf 100644 --- a/service/service.go +++ b/service/service.go @@ -7,16 +7,17 @@ import ( "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/rpc" - originSync "github.com/duanhf2012/origin/util/sync" "github.com/duanhf2012/origin/util/timer" "reflect" "runtime" "strconv" "sync" "sync/atomic" + "github.com/duanhf2012/origin/concurrent" ) var timerDispatcherLen = 100000 +var maxServiceEventChannelNum = 2000000 type IService interface { Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) @@ -40,14 +41,9 @@ type IService interface { OpenProfiler() } -// eventPool的内存池,缓存Event -var maxServiceEventChannel = 2000000 -var eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData { - return &event.Event{} -}) - type Service struct { Module + rpcHandler rpc.RpcHandler //rpc name string //service name wg sync.WaitGroup @@ -59,8 +55,7 @@ type Service struct { nodeEventLister rpc.INodeListener discoveryServiceLister rpc.IDiscoveryServiceListener chanEvent chan event.IEvent - - closeSig chan bool + closeSig chan struct{} } // RpcConnEvent Node结点连接事件 @@ -77,10 +72,7 @@ type DiscoveryServiceEvent struct{ } func SetMaxServiceChannel(maxEventChannel int){ - maxServiceEventChannel = maxEventChannel - eventPool = originSync.NewPoolEx(make(chan originSync.IPoolData, maxServiceEventChannel), func() originSync.IPoolData { - return &event.Event{} - }) + maxServiceEventChannelNum = maxEventChannel } func (rpcEventData *DiscoveryServiceEvent) GetEventType() event.EventType{ @@ -105,10 +97,10 @@ func (s *Service) OpenProfiler() { } func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { - s.closeSig = make(chan bool, 1) + s.closeSig = make(chan struct{}) s.dispatcher =timer.NewDispatcher(timerDispatcherLen) if s.chanEvent == nil { - s.chanEvent = make(chan event.IEvent,maxServiceEventChannel) + s.chanEvent = make(chan event.IEvent,maxServiceEventChannelNum) } s.rpcHandler.InitRpcHandler(iService.(rpc.IRpcHandler),getClientFun,getServerFun,iService.(rpc.IRpcHandlerChannel)) @@ -124,6 +116,7 @@ func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServe s.eventProcessor.Init(s) s.eventHandler = event.NewEventHandler() s.eventHandler.Init(s.eventProcessor) + s.Module.IConcurrent = &concurrent.Concurrent{} } func (s *Service) Start() { @@ -146,12 +139,19 @@ func (s *Service) Start() { func (s *Service) Run() { defer s.wg.Done() var bStop = false + + concurrent := s.IConcurrent.(*concurrent.Concurrent) + concurrentCBChannel := concurrent.GetCallBackChannel() + s.self.(IService).OnStart() for{ var analyzer *profiler.Analyzer select { case <- s.closeSig: bStop = true + concurrent.Close() + case cb:=<-concurrentCBChannel: + concurrent.DoCallback(cb) case ev := <- s.chanEvent: switch ev.GetEventType() { case event.ServiceRpcRequestEvent: @@ -174,7 +174,7 @@ func (s *Service) Run() { analyzer.Pop() analyzer = nil } - eventPool.Put(cEvent) + event.DeleteEvent(cEvent) case event.ServiceRpcResponseEvent: cEvent,ok := ev.(*event.Event) if ok == false { @@ -194,7 +194,7 @@ func (s *Service) Run() { analyzer.Pop() analyzer = nil } - eventPool.Put(cEvent) + event.DeleteEvent(cEvent) default: if s.profiler!=nil { analyzer = s.profiler.Push("[SEvent]"+strconv.Itoa(int(ev.GetEventType()))) @@ -329,9 +329,8 @@ func (s *Service) UnRegDiscoverListener(rpcLister rpc.INodeListener) { UnRegDiscoveryServiceEventFun(s.GetName()) } - func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{ - ev := eventPool.Get().(*event.Event) + ev := event.NewEvent() ev.Type = event.ServiceRpcRequestEvent ev.Data = rpcRequest @@ -339,7 +338,7 @@ func (s *Service) PushRpcRequest(rpcRequest *rpc.RpcRequest) error{ } func (s *Service) PushRpcResponse(call *rpc.Call) error{ - ev := eventPool.Get().(*event.Event) + ev := event.NewEvent() ev.Type = event.ServiceRpcResponseEvent ev.Data = call @@ -351,7 +350,7 @@ func (s *Service) PushEvent(ev event.IEvent) error{ } func (s *Service) pushEvent(ev event.IEvent) error{ - if len(s.chanEvent) >= maxServiceEventChannel { + if len(s.chanEvent) >= maxServiceEventChannelNum { err := errors.New("The event channel in the service is full") log.SError(err.Error()) return err diff --git a/util/queue/deque.go b/util/queue/deque.go new file mode 100644 index 0000000..380b118 --- /dev/null +++ b/util/queue/deque.go @@ -0,0 +1,413 @@ +package queue + +// minCapacity is the smallest capacity that deque may have. Must be power of 2 +// for bitwise modulus: x % n == x & (n - 1). +const minCapacity = 16 + +// Deque represents a single instance of the deque data structure. A Deque +// instance contains items of the type sepcified by the type argument. +type Deque[T any] struct { + buf []T + head int + tail int + count int + minCap int +} + +// New creates a new Deque, optionally setting the current and minimum capacity +// when non-zero values are given for these. The Deque instance returns +// operates on items of the type specified by the type argument. For example, +// to create a Deque that contains strings, +// +// stringDeque := deque.New[string]() +// +// To create a Deque with capacity to store 2048 ints without resizing, and +// that will not resize below space for 32 items when removing items: +// d := deque.New[int](2048, 32) +// +// To create a Deque that has not yet allocated memory, but after it does will +// never resize to have space for less than 64 items: +// d := deque.New[int](0, 64) +// +// Any size values supplied here are rounded up to the nearest power of 2. +func New[T any](size ...int) *Deque[T] { + var capacity, minimum int + if len(size) >= 1 { + capacity = size[0] + if len(size) >= 2 { + minimum = size[1] + } + } + + minCap := minCapacity + for minCap < minimum { + minCap <<= 1 + } + + var buf []T + if capacity != 0 { + bufSize := minCap + for bufSize < capacity { + bufSize <<= 1 + } + buf = make([]T, bufSize) + } + + return &Deque[T]{ + buf: buf, + minCap: minCap, + } +} + +// Cap returns the current capacity of the Deque. If q is nil, q.Cap() is zero. +func (q *Deque[T]) Cap() int { + if q == nil { + return 0 + } + return len(q.buf) +} + +// Len returns the number of elements currently stored in the queue. If q is +// nil, q.Len() is zero. +func (q *Deque[T]) Len() int { + if q == nil { + return 0 + } + return q.count +} + +// PushBack appends an element to the back of the queue. Implements FIFO when +// elements are removed with PopFront(), and LIFO when elements are removed +// with PopBack(). +func (q *Deque[T]) PushBack(elem T) { + q.growIfFull() + + q.buf[q.tail] = elem + // Calculate new tail position. + q.tail = q.next(q.tail) + q.count++ +} + +// PushFront prepends an element to the front of the queue. +func (q *Deque[T]) PushFront(elem T) { + q.growIfFull() + + // Calculate new head position. + q.head = q.prev(q.head) + q.buf[q.head] = elem + q.count++ +} + +// PopFront removes and returns the element from the front of the queue. +// Implements FIFO when used with PushBack(). If the queue is empty, the call +// panics. +func (q *Deque[T]) PopFront() T { + if q.count <= 0 { + panic("deque: PopFront() called on empty queue") + } + ret := q.buf[q.head] + var zero T + q.buf[q.head] = zero + // Calculate new head position. + q.head = q.next(q.head) + q.count-- + + q.shrinkIfExcess() + return ret +} + +// PopBack removes and returns the element from the back of the queue. +// Implements LIFO when used with PushBack(). If the queue is empty, the call +// panics. +func (q *Deque[T]) PopBack() T { + if q.count <= 0 { + panic("deque: PopBack() called on empty queue") + } + + // Calculate new tail position + q.tail = q.prev(q.tail) + + // Remove value at tail. + ret := q.buf[q.tail] + var zero T + q.buf[q.tail] = zero + q.count-- + + q.shrinkIfExcess() + return ret +} + +// Front returns the element at the front of the queue. This is the element +// that would be returned by PopFront(). This call panics if the queue is +// empty. +func (q *Deque[T]) Front() T { + if q.count <= 0 { + panic("deque: Front() called when empty") + } + return q.buf[q.head] +} + +// Back returns the element at the back of the queue. This is the element that +// would be returned by PopBack(). This call panics if the queue is empty. +func (q *Deque[T]) Back() T { + if q.count <= 0 { + panic("deque: Back() called when empty") + } + return q.buf[q.prev(q.tail)] +} + +// At returns the element at index i in the queue without removing the element +// from the queue. This method accepts only non-negative index values. At(0) +// refers to the first element and is the same as Front(). At(Len()-1) refers +// to the last element and is the same as Back(). If the index is invalid, the +// call panics. +// +// The purpose of At is to allow Deque to serve as a more general purpose +// circular buffer, where items are only added to and removed from the ends of +// the deque, but may be read from any place within the deque. Consider the +// case of a fixed-size circular log buffer: A new entry is pushed onto one end +// and when full the oldest is popped from the other end. All the log entries +// in the buffer must be readable without altering the buffer contents. +func (q *Deque[T]) At(i int) T { + if i < 0 || i >= q.count { + panic("deque: At() called with index out of range") + } + // bitwise modulus + return q.buf[(q.head+i)&(len(q.buf)-1)] +} + +// Set puts the element at index i in the queue. Set shares the same purpose +// than At() but perform the opposite operation. The index i is the same index +// defined by At(). If the index is invalid, the call panics. +func (q *Deque[T]) Set(i int, elem T) { + if i < 0 || i >= q.count { + panic("deque: Set() called with index out of range") + } + // bitwise modulus + q.buf[(q.head+i)&(len(q.buf)-1)] = elem +} + +// Clear removes all elements from the queue, but retains the current capacity. +// This is useful when repeatedly reusing the queue at high frequency to avoid +// GC during reuse. The queue will not be resized smaller as long as items are +// only added. Only when items are removed is the queue subject to getting +// resized smaller. +func (q *Deque[T]) Clear() { + // bitwise modulus + modBits := len(q.buf) - 1 + var zero T + for h := q.head; h != q.tail; h = (h + 1) & modBits { + q.buf[h] = zero + } + q.head = 0 + q.tail = 0 + q.count = 0 +} + +// Rotate rotates the deque n steps front-to-back. If n is negative, rotates +// back-to-front. Having Deque provide Rotate() avoids resizing that could +// happen if implementing rotation using only Pop and Push methods. If q.Len() +// is one or less, or q is nil, then Rotate does nothing. +func (q *Deque[T]) Rotate(n int) { + if q.Len() <= 1 { + return + } + // Rotating a multiple of q.count is same as no rotation. + n %= q.count + if n == 0 { + return + } + + modBits := len(q.buf) - 1 + // If no empty space in buffer, only move head and tail indexes. + if q.head == q.tail { + // Calculate new head and tail using bitwise modulus. + q.head = (q.head + n) & modBits + q.tail = q.head + return + } + + var zero T + + if n < 0 { + // Rotate back to front. + for ; n < 0; n++ { + // Calculate new head and tail using bitwise modulus. + q.head = (q.head - 1) & modBits + q.tail = (q.tail - 1) & modBits + // Put tail value at head and remove value at tail. + q.buf[q.head] = q.buf[q.tail] + q.buf[q.tail] = zero + } + return + } + + // Rotate front to back. + for ; n > 0; n-- { + // Put head value at tail and remove value at head. + q.buf[q.tail] = q.buf[q.head] + q.buf[q.head] = zero + // Calculate new head and tail using bitwise modulus. + q.head = (q.head + 1) & modBits + q.tail = (q.tail + 1) & modBits + } +} + +// Index returns the index into the Deque of the first item satisfying f(item), +// or -1 if none do. If q is nil, then -1 is always returned. Search is linear +// starting with index 0. +func (q *Deque[T]) Index(f func(T) bool) int { + if q.Len() > 0 { + modBits := len(q.buf) - 1 + for i := 0; i < q.count; i++ { + if f(q.buf[(q.head+i)&modBits]) { + return i + } + } + } + return -1 +} + +// RIndex is the same as Index, but searches from Back to Front. The index +// returned is from Front to Back, where index 0 is the index of the item +// returned by Front(). +func (q *Deque[T]) RIndex(f func(T) bool) int { + if q.Len() > 0 { + modBits := len(q.buf) - 1 + for i := q.count - 1; i >= 0; i-- { + if f(q.buf[(q.head+i)&modBits]) { + return i + } + } + } + return -1 +} + +// Insert is used to insert an element into the middle of the queue, before the +// element at the specified index. Insert(0,e) is the same as PushFront(e) and +// Insert(Len(),e) is the same as PushBack(e). Accepts only non-negative index +// values, and panics if index is out of range. +// +// Important: Deque is optimized for O(1) operations at the ends of the queue, +// not for operations in the the middle. Complexity of this function is +// constant plus linear in the lesser of the distances between the index and +// either of the ends of the queue. +func (q *Deque[T]) Insert(at int, item T) { + if at < 0 || at > q.count { + panic("deque: Insert() called with index out of range") + } + if at*2 < q.count { + q.PushFront(item) + front := q.head + for i := 0; i < at; i++ { + next := q.next(front) + q.buf[front], q.buf[next] = q.buf[next], q.buf[front] + front = next + } + return + } + swaps := q.count - at + q.PushBack(item) + back := q.prev(q.tail) + for i := 0; i < swaps; i++ { + prev := q.prev(back) + q.buf[back], q.buf[prev] = q.buf[prev], q.buf[back] + back = prev + } +} + +// Remove removes and returns an element from the middle of the queue, at the +// specified index. Remove(0) is the same as PopFront() and Remove(Len()-1) is +// the same as PopBack(). Accepts only non-negative index values, and panics if +// index is out of range. +// +// Important: Deque is optimized for O(1) operations at the ends of the queue, +// not for operations in the the middle. Complexity of this function is +// constant plus linear in the lesser of the distances between the index and +// either of the ends of the queue. +func (q *Deque[T]) Remove(at int) T { + if at < 0 || at >= q.Len() { + panic("deque: Remove() called with index out of range") + } + + rm := (q.head + at) & (len(q.buf) - 1) + if at*2 < q.count { + for i := 0; i < at; i++ { + prev := q.prev(rm) + q.buf[prev], q.buf[rm] = q.buf[rm], q.buf[prev] + rm = prev + } + return q.PopFront() + } + swaps := q.count - at - 1 + for i := 0; i < swaps; i++ { + next := q.next(rm) + q.buf[rm], q.buf[next] = q.buf[next], q.buf[rm] + rm = next + } + return q.PopBack() +} + +// SetMinCapacity sets a minimum capacity of 2^minCapacityExp. If the value of +// the minimum capacity is less than or equal to the minimum allowed, then +// capacity is set to the minimum allowed. This may be called at anytime to set +// a new minimum capacity. +// +// Setting a larger minimum capacity may be used to prevent resizing when the +// number of stored items changes frequently across a wide range. +func (q *Deque[T]) SetMinCapacity(minCapacityExp uint) { + if 1< minCapacity { + q.minCap = 1 << minCapacityExp + } else { + q.minCap = minCapacity + } +} + +// prev returns the previous buffer position wrapping around buffer. +func (q *Deque[T]) prev(i int) int { + return (i - 1) & (len(q.buf) - 1) // bitwise modulus +} + +// next returns the next buffer position wrapping around buffer. +func (q *Deque[T]) next(i int) int { + return (i + 1) & (len(q.buf) - 1) // bitwise modulus +} + +// growIfFull resizes up if the buffer is full. +func (q *Deque[T]) growIfFull() { + if q.count != len(q.buf) { + return + } + if len(q.buf) == 0 { + if q.minCap == 0 { + q.minCap = minCapacity + } + q.buf = make([]T, q.minCap) + return + } + q.resize() +} + +// shrinkIfExcess resize down if the buffer 1/4 full. +func (q *Deque[T]) shrinkIfExcess() { + if len(q.buf) > q.minCap && (q.count<<2) == len(q.buf) { + q.resize() + } +} + +// resize resizes the deque to fit exactly twice its current contents. This is +// used to grow the queue when it is full, and also to shrink it when it is +// only a quarter full. +func (q *Deque[T]) resize() { + newBuf := make([]T, q.count<<1) + if q.tail > q.head { + copy(newBuf, q.buf[q.head:q.tail]) + } else { + n := copy(newBuf, q.buf[q.head:]) + copy(newBuf[n:], q.buf[:q.tail]) + } + + q.head = 0 + q.tail = q.count + q.buf = newBuf +} diff --git a/util/queue/deque_test.go b/util/queue/deque_test.go new file mode 100644 index 0000000..f1d8338 --- /dev/null +++ b/util/queue/deque_test.go @@ -0,0 +1,836 @@ +package queue + +import ( + "fmt" + "testing" + "unicode" +) + +func TestEmpty(t *testing.T) { + q := New[string]() + if q.Len() != 0 { + t.Error("q.Len() =", q.Len(), "expect 0") + } + if q.Cap() != 0 { + t.Error("expected q.Cap() == 0") + } + idx := q.Index(func(item string) bool { + return true + }) + if idx != -1 { + t.Error("should return -1 index for nil deque") + } + idx = q.RIndex(func(item string) bool { + return true + }) + if idx != -1 { + t.Error("should return -1 index for nil deque") + } +} + +func TestNil(t *testing.T) { + var q *Deque[int] + if q.Len() != 0 { + t.Error("expected q.Len() == 0") + } + if q.Cap() != 0 { + t.Error("expected q.Cap() == 0") + } + q.Rotate(5) + idx := q.Index(func(item int) bool { + return true + }) + if idx != -1 { + t.Error("should return -1 index for nil deque") + } + idx = q.RIndex(func(item int) bool { + return true + }) + if idx != -1 { + t.Error("should return -1 index for nil deque") + } +} + +func TestFrontBack(t *testing.T) { + var q Deque[string] + q.PushBack("foo") + q.PushBack("bar") + q.PushBack("baz") + if q.Front() != "foo" { + t.Error("wrong value at front of queue") + } + if q.Back() != "baz" { + t.Error("wrong value at back of queue") + } + + if q.PopFront() != "foo" { + t.Error("wrong value removed from front of queue") + } + if q.Front() != "bar" { + t.Error("wrong value remaining at front of queue") + } + if q.Back() != "baz" { + t.Error("wrong value remaining at back of queue") + } + + if q.PopBack() != "baz" { + t.Error("wrong value removed from back of queue") + } + if q.Front() != "bar" { + t.Error("wrong value remaining at front of queue") + } + if q.Back() != "bar" { + t.Error("wrong value remaining at back of queue") + } +} + +func TestGrowShrinkBack(t *testing.T) { + var q Deque[int] + size := minCapacity * 2 + + for i := 0; i < size; i++ { + if q.Len() != i { + t.Error("q.Len() =", q.Len(), "expected", i) + } + q.PushBack(i) + } + bufLen := len(q.buf) + + // Remove from back. + for i := size; i > 0; i-- { + if q.Len() != i { + t.Error("q.Len() =", q.Len(), "expected", i) + } + x := q.PopBack() + if x != i-1 { + t.Error("q.PopBack() =", x, "expected", i-1) + } + } + if q.Len() != 0 { + t.Error("q.Len() =", q.Len(), "expected 0") + } + if len(q.buf) == bufLen { + t.Error("queue buffer did not shrink") + } +} + +func TestGrowShrinkFront(t *testing.T) { + var q Deque[int] + size := minCapacity * 2 + + for i := 0; i < size; i++ { + if q.Len() != i { + t.Error("q.Len() =", q.Len(), "expected", i) + } + q.PushBack(i) + } + bufLen := len(q.buf) + + // Remove from Front + for i := 0; i < size; i++ { + if q.Len() != size-i { + t.Error("q.Len() =", q.Len(), "expected", minCapacity*2-i) + } + x := q.PopFront() + if x != i { + t.Error("q.PopBack() =", x, "expected", i) + } + } + if q.Len() != 0 { + t.Error("q.Len() =", q.Len(), "expected 0") + } + if len(q.buf) == bufLen { + t.Error("queue buffer did not shrink") + } +} + +func TestSimple(t *testing.T) { + var q Deque[int] + + for i := 0; i < minCapacity; i++ { + q.PushBack(i) + } + if q.Front() != 0 { + t.Fatalf("expected 0 at front, got %d", q.Front()) + } + if q.Back() != minCapacity-1 { + t.Fatalf("expected %d at back, got %d", minCapacity-1, q.Back()) + } + + for i := 0; i < minCapacity; i++ { + if q.Front() != i { + t.Error("peek", i, "had value", q.Front()) + } + x := q.PopFront() + if x != i { + t.Error("remove", i, "had value", x) + } + } + + q.Clear() + for i := 0; i < minCapacity; i++ { + q.PushFront(i) + } + for i := minCapacity - 1; i >= 0; i-- { + x := q.PopFront() + if x != i { + t.Error("remove", i, "had value", x) + } + } +} + +func TestBufferWrap(t *testing.T) { + var q Deque[int] + + for i := 0; i < minCapacity; i++ { + q.PushBack(i) + } + + for i := 0; i < 3; i++ { + q.PopFront() + q.PushBack(minCapacity + i) + } + + for i := 0; i < minCapacity; i++ { + if q.Front() != i+3 { + t.Error("peek", i, "had value", q.Front()) + } + q.PopFront() + } +} + +func TestBufferWrapReverse(t *testing.T) { + var q Deque[int] + + for i := 0; i < minCapacity; i++ { + q.PushFront(i) + } + for i := 0; i < 3; i++ { + q.PopBack() + q.PushFront(minCapacity + i) + } + + for i := 0; i < minCapacity; i++ { + if q.Back() != i+3 { + t.Error("peek", i, "had value", q.Front()) + } + q.PopBack() + } +} + +func TestLen(t *testing.T) { + var q Deque[int] + + if q.Len() != 0 { + t.Error("empty queue length not 0") + } + + for i := 0; i < 1000; i++ { + q.PushBack(i) + if q.Len() != i+1 { + t.Error("adding: queue with", i, "elements has length", q.Len()) + } + } + for i := 0; i < 1000; i++ { + q.PopFront() + if q.Len() != 1000-i-1 { + t.Error("removing: queue with", 1000-i-i, "elements has length", q.Len()) + } + } +} + +func TestBack(t *testing.T) { + var q Deque[int] + + for i := 0; i < minCapacity+5; i++ { + q.PushBack(i) + if q.Back() != i { + t.Errorf("Back returned wrong value") + } + } +} + +func TestNew(t *testing.T) { + minCap := 64 + q := New[string](0, minCap) + if q.Cap() != 0 { + t.Fatal("should not have allowcated mem yet") + } + q.PushBack("foo") + q.PopFront() + if q.Len() != 0 { + t.Fatal("Len() should return 0") + } + if q.Cap() != minCap { + t.Fatalf("worng capactiy expected %d, got %d", minCap, q.Cap()) + } + + curCap := 128 + q = New[string](curCap, minCap) + if q.Cap() != curCap { + t.Fatalf("Cap() should return %d, got %d", curCap, q.Cap()) + } + if q.Len() != 0 { + t.Fatalf("Len() should return 0") + } + q.PushBack("foo") + if q.Cap() != curCap { + t.Fatalf("Cap() should return %d, got %d", curCap, q.Cap()) + } +} + +func checkRotate(t *testing.T, size int) { + var q Deque[int] + for i := 0; i < size; i++ { + q.PushBack(i) + } + + for i := 0; i < q.Len(); i++ { + x := i + for n := 0; n < q.Len(); n++ { + if q.At(n) != x { + t.Fatalf("a[%d] != %d after rotate and copy", n, x) + } + x++ + if x == q.Len() { + x = 0 + } + } + q.Rotate(1) + if q.Back() != i { + t.Fatal("wrong value during rotation") + } + } + for i := q.Len() - 1; i >= 0; i-- { + q.Rotate(-1) + if q.Front() != i { + t.Fatal("wrong value during reverse rotation") + } + } +} + +func TestRotate(t *testing.T) { + checkRotate(t, 10) + checkRotate(t, minCapacity) + checkRotate(t, minCapacity+minCapacity/2) + + var q Deque[int] + for i := 0; i < 10; i++ { + q.PushBack(i) + } + q.Rotate(11) + if q.Front() != 1 { + t.Error("rotating 11 places should have been same as one") + } + q.Rotate(-21) + if q.Front() != 0 { + t.Error("rotating -21 places should have been same as one -1") + } + q.Rotate(q.Len()) + if q.Front() != 0 { + t.Error("should not have rotated") + } + q.Clear() + q.PushBack(0) + q.Rotate(13) + if q.Front() != 0 { + t.Error("should not have rotated") + } +} + +func TestAt(t *testing.T) { + var q Deque[int] + + for i := 0; i < 1000; i++ { + q.PushBack(i) + } + + // Front to back. + for j := 0; j < q.Len(); j++ { + if q.At(j) != j { + t.Errorf("index %d doesn't contain %d", j, j) + } + } + + // Back to front + for j := 1; j <= q.Len(); j++ { + if q.At(q.Len()-j) != q.Len()-j { + t.Errorf("index %d doesn't contain %d", q.Len()-j, q.Len()-j) + } + } +} + +func TestSet(t *testing.T) { + var q Deque[int] + + for i := 0; i < 1000; i++ { + q.PushBack(i) + q.Set(i, i+50) + } + + // Front to back. + for j := 0; j < q.Len(); j++ { + if q.At(j) != j+50 { + t.Errorf("index %d doesn't contain %d", j, j+50) + } + } +} + +func TestClear(t *testing.T) { + var q Deque[int] + + for i := 0; i < 100; i++ { + q.PushBack(i) + } + if q.Len() != 100 { + t.Error("push: queue with 100 elements has length", q.Len()) + } + cap := len(q.buf) + q.Clear() + if q.Len() != 0 { + t.Error("empty queue length not 0 after clear") + } + if len(q.buf) != cap { + t.Error("queue capacity changed after clear") + } + + // Check that there are no remaining references after Clear() + for i := 0; i < len(q.buf); i++ { + if q.buf[i] != 0 { + t.Error("queue has non-nil deleted elements after Clear()") + break + } + } +} + +func TestIndex(t *testing.T) { + var q Deque[rune] + for _, x := range "Hello, 世界" { + q.PushBack(x) + } + idx := q.Index(func(item rune) bool { + c := item + return unicode.Is(unicode.Han, c) + }) + if idx != 7 { + t.Fatal("Expected index 7, got", idx) + } + idx = q.Index(func(item rune) bool { + c := item + return c == 'H' + }) + if idx != 0 { + t.Fatal("Expected index 0, got", idx) + } + idx = q.Index(func(item rune) bool { + return false + }) + if idx != -1 { + t.Fatal("Expected index -1, got", idx) + } +} + +func TestRIndex(t *testing.T) { + var q Deque[rune] + for _, x := range "Hello, 世界" { + q.PushBack(x) + } + idx := q.RIndex(func(item rune) bool { + c := item + return unicode.Is(unicode.Han, c) + }) + if idx != 8 { + t.Fatal("Expected index 8, got", idx) + } + idx = q.RIndex(func(item rune) bool { + c := item + return c == 'H' + }) + if idx != 0 { + t.Fatal("Expected index 0, got", idx) + } + idx = q.RIndex(func(item rune) bool { + return false + }) + if idx != -1 { + t.Fatal("Expected index -1, got", idx) + } +} + +func TestInsert(t *testing.T) { + q := new(Deque[rune]) + for _, x := range "ABCDEFG" { + q.PushBack(x) + } + q.Insert(4, 'x') // ABCDxEFG + if q.At(4) != 'x' { + t.Error("expected x at position 4, got", q.At(4)) + } + + q.Insert(2, 'y') // AByCDxEFG + if q.At(2) != 'y' { + t.Error("expected y at position 2") + } + if q.At(5) != 'x' { + t.Error("expected x at position 5") + } + + q.Insert(0, 'b') // bAByCDxEFG + if q.Front() != 'b' { + t.Error("expected b inserted at front, got", q.Front()) + } + + q.Insert(q.Len(), 'e') // bAByCDxEFGe + + for i, x := range "bAByCDxEFGe" { + if q.PopFront() != x { + t.Error("expected", x, "at position", i) + } + } + + qs := New[string](16) + + for i := 0; i < qs.Cap(); i++ { + qs.PushBack(fmt.Sprint(i)) + } + // deque: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 + // buffer: [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] + for i := 0; i < qs.Cap()/2; i++ { + qs.PopFront() + } + // deque: 8 9 10 11 12 13 14 15 + // buffer: [_,_,_,_,_,_,_,_,8,9,10,11,12,13,14,15] + for i := 0; i < qs.Cap()/4; i++ { + qs.PushBack(fmt.Sprint(qs.Cap() + i)) + } + // deque: 8 9 10 11 12 13 14 15 16 17 18 19 + // buffer: [16,17,18,19,_,_,_,_,8,9,10,11,12,13,14,15] + + at := qs.Len() - 2 + qs.Insert(at, "x") + // deque: 8 9 10 11 12 13 14 15 16 17 x 18 19 + // buffer: [16,17,x,18,19,_,_,_,8,9,10,11,12,13,14,15] + if qs.At(at) != "x" { + t.Error("expected x at position", at) + } + if qs.At(at) != "x" { + t.Error("expected x at position", at) + } + + qs.Insert(2, "y") + // deque: 8 9 y 10 11 12 13 14 15 16 17 x 18 19 + // buffer: [16,17,x,18,19,_,_,8,9,y,10,11,12,13,14,15] + if qs.At(2) != "y" { + t.Error("expected y at position 2") + } + if qs.At(at+1) != "x" { + t.Error("expected x at position 5") + } + + qs.Insert(0, "b") + // deque: b 8 9 y 10 11 12 13 14 15 16 17 x 18 19 + // buffer: [16,17,x,18,19,_,b,8,9,y,10,11,12,13,14,15] + if qs.Front() != "b" { + t.Error("expected b inserted at front, got", qs.Front()) + } + + qs.Insert(qs.Len(), "e") + if qs.Cap() != qs.Len() { + t.Fatal("Expected full buffer") + } + // deque: b 8 9 y 10 11 12 13 14 15 16 17 x 18 19 e + // buffer: [16,17,x,18,19,e,b,8,9,y,10,11,12,13,14,15] + for i, x := range []string{"16", "17", "x", "18", "19", "e", "b", "8", "9", "y", "10", "11", "12", "13", "14", "15"} { + if qs.buf[i] != x { + t.Error("expected", x, "at buffer position", i) + } + } + for i, x := range []string{"b", "8", "9", "y", "10", "11", "12", "13", "14", "15", "16", "17", "x", "18", "19", "e"} { + if qs.Front() != x { + t.Error("expected", x, "at position", i, "got", qs.Front()) + } + qs.PopFront() + } +} + +func TestRemove(t *testing.T) { + q := new(Deque[rune]) + for _, x := range "ABCDEFG" { + q.PushBack(x) + } + + if q.Remove(4) != 'E' { // ABCDFG + t.Error("expected E from position 4") + } + + if q.Remove(2) != 'C' { // ABDFG + t.Error("expected C at position 2") + } + if q.Back() != 'G' { + t.Error("expected G at back") + } + + if q.Remove(0) != 'A' { // BDFG + t.Error("expected to remove A from front") + } + if q.Front() != 'B' { + t.Error("expected G at back") + } + + if q.Remove(q.Len()-1) != 'G' { // BDF + t.Error("expected to remove G from back") + } + if q.Back() != 'F' { + t.Error("expected F at back") + } + + if q.Len() != 3 { + t.Error("wrong length") + } +} + +func TestFrontBackOutOfRangePanics(t *testing.T) { + const msg = "should panic when peeking empty queue" + var q Deque[int] + assertPanics(t, msg, func() { + q.Front() + }) + assertPanics(t, msg, func() { + q.Back() + }) + + q.PushBack(1) + q.PopFront() + + assertPanics(t, msg, func() { + q.Front() + }) + assertPanics(t, msg, func() { + q.Back() + }) +} + +func TestPopFrontOutOfRangePanics(t *testing.T) { + var q Deque[int] + + assertPanics(t, "should panic when removing empty queue", func() { + q.PopFront() + }) + + q.PushBack(1) + q.PopFront() + + assertPanics(t, "should panic when removing emptied queue", func() { + q.PopFront() + }) +} + +func TestPopBackOutOfRangePanics(t *testing.T) { + var q Deque[int] + + assertPanics(t, "should panic when removing empty queue", func() { + q.PopBack() + }) + + q.PushBack(1) + q.PopBack() + + assertPanics(t, "should panic when removing emptied queue", func() { + q.PopBack() + }) +} + +func TestAtOutOfRangePanics(t *testing.T) { + var q Deque[int] + + q.PushBack(1) + q.PushBack(2) + q.PushBack(3) + + assertPanics(t, "should panic when negative index", func() { + q.At(-4) + }) + + assertPanics(t, "should panic when index greater than length", func() { + q.At(4) + }) +} + +func TestSetOutOfRangePanics(t *testing.T) { + var q Deque[int] + + q.PushBack(1) + q.PushBack(2) + q.PushBack(3) + + assertPanics(t, "should panic when negative index", func() { + q.Set(-4, 1) + }) + + assertPanics(t, "should panic when index greater than length", func() { + q.Set(4, 1) + }) +} + +func TestInsertOutOfRangePanics(t *testing.T) { + q := new(Deque[string]) + + assertPanics(t, "should panic when inserting out of range", func() { + q.Insert(1, "X") + }) + + q.PushBack("A") + + assertPanics(t, "should panic when inserting at negative index", func() { + q.Insert(-1, "Y") + }) + + assertPanics(t, "should panic when inserting out of range", func() { + q.Insert(2, "B") + }) +} + +func TestRemoveOutOfRangePanics(t *testing.T) { + q := new(Deque[string]) + + assertPanics(t, "should panic when removing from empty queue", func() { + q.Remove(0) + }) + + q.PushBack("A") + + assertPanics(t, "should panic when removing at negative index", func() { + q.Remove(-1) + }) + + assertPanics(t, "should panic when removing out of range", func() { + q.Remove(1) + }) +} + +func TestSetMinCapacity(t *testing.T) { + var q Deque[string] + exp := uint(8) + q.SetMinCapacity(exp) + q.PushBack("A") + if q.minCap != 1<0 { + return pq.priorityQueueSlice[0] + } + + return nil +} func (pq *PriorityQueue) Len() int { return len(pq.priorityQueueSlice) }