Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 130 additions & 81 deletions internal/node/pool.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package node

import (
"container/list"
"errors"
"log"
"time"
Expand All @@ -12,8 +11,9 @@ import (
)

type ContainerPool struct {
busy *list.List
idle *list.List
// for better efficiently we now use slices here instead of linked lists
busy []*container.Container
idle []*container.Container
}

var NoWarmFoundErr = errors.New("no warm container is available")
Expand All @@ -30,20 +30,22 @@ func GetContainerPool(f *function.Function) *ContainerPool {
}

func (fp *ContainerPool) popIdleContainer() (*container.Container, bool) {
// TODO: picking most-recent / least-recent container might be better?
elem := fp.idle.Front()
if elem == nil {
n := len(fp.idle)
if n == 0 {
return nil, false
}
// LIFO (maybe better for cache locality)
c := fp.idle[n-1]

c := fp.idle.Remove(elem).(*container.Container)
fp.idle[n-1] = nil // to favor garbage collection
fp.idle = fp.idle[:n-1] // pop the slice

return c, true
}

func (fp *ContainerPool) getReusableContainer(maxConcurrency int16) (*container.Container, bool) {
for elem := fp.busy.Front(); elem != nil; elem = elem.Next() {
c := elem.Value.(*container.Container)
for _, elem := range fp.busy {
c := elem
if c.RequestsCount < maxConcurrency {
return c, true
}
Expand All @@ -53,11 +55,10 @@ func (fp *ContainerPool) getReusableContainer(maxConcurrency int16) (*container.
}

func newContainerPool() *ContainerPool {
fp := &ContainerPool{}
fp.busy = list.New()
fp.idle = list.New()

return fp
return &ContainerPool{
busy: make([]*container.Container, 0, 10),
idle: make([]*container.Container, 0, 10),
}
}

func acquireNewMemory(mem int64, forWarmPool bool) bool {
Expand Down Expand Up @@ -119,7 +120,7 @@ func acquireWarmContainer(f *function.Function) (*container.Container, error) {

// add container to the busy pool
c.RequestsCount = 1
fp.busy.PushBack(c)
fp.busy = append(fp.busy, c)

log.Printf("Using warm %s for %s. Now: %v", c.ID, f, &LocalResources)
return c, nil
Expand Down Expand Up @@ -153,24 +154,30 @@ func HandleCompletion(cont *container.Container, f *function.Function) {
if cont.RequestsCount == 0 {
// the container is now idle and must be moved to the warm pool
fp := GetContainerPool(f)
// we must update the busy list by removing this element
var deleted interface{}
elem := fp.busy.Front()
for ok := elem != nil; ok; ok = elem != nil {
if elem.Value.(*container.Container) == cont {
deleted = fp.busy.Remove(elem) // delete the element from the busy list
// Search the container index in the slice
idx := -1
for i, c := range fp.busy {
if c == cont { // with slices, we can compare pointers
idx = i
break
}
elem = elem.Next()
}
if deleted == nil {
log.Println("Failed to release a container!")

if idx == -1 {
log.Println("Failed to release a container! Not found in busy pool.")
return
}

// swap then pop from the slice. This way we don't have to
lastIdx := len(fp.busy) - 1
fp.busy[idx] = fp.busy[lastIdx] // swap between last element and the one we want to delete
fp.busy[lastIdx] = nil // nil to favor garbage collection
fp.busy = fp.busy[:lastIdx] // pop the slice

// finally, we add the container to the idle pool
d := time.Duration(config.GetInt(config.CONTAINER_EXPIRATION_TIME, 600)) * time.Second
cont.ExpirationTime = time.Now().Add(d).UnixNano()
fp.idle.PushBack(cont)
fp.idle = append(fp.idle, cont)

LocalResources.usedCPUs -= f.CPUDemand
LocalResources.busyPoolUsedMem -= f.MemoryMB
Expand Down Expand Up @@ -216,10 +223,10 @@ func NewContainerWithAcquiredResources(fun *function.Function, startAsIdle bool,

fp := GetContainerPool(fun)
if startAsIdle {
fp.idle.PushBack(cont)
fp.idle = append(fp.idle, cont)
} else {
cont.RequestsCount = 1
fp.busy.PushBack(cont) // We immediately mark it as busy
fp.busy = append(fp.busy, cont) // We immediately mark it as busy
}

return cont, nil
Expand All @@ -231,6 +238,7 @@ func NewContainerWithAcquiredResourcesAsync(fun *function.Function, okCallback f
if err != nil {
log.Printf("Failed container creation: %v\n", err)
errCallback(err)
return
}

LocalResources.Lock()
Expand All @@ -243,63 +251,80 @@ func NewContainerWithAcquiredResourcesAsync(fun *function.Function, okCallback f

fp := GetContainerPool(fun)
cont.RequestsCount = 1
fp.busy.PushBack(cont) // We immediately mark it as busy
fp.busy = append(fp.busy, cont) // We immediately mark it as busy
okCallback(cont)
}()
}

type itemToDismiss struct {
contID container.ContainerID
cont *container.Container
pool *ContainerPool
elem *list.Element
memory int64
}

// dismissContainer ... this function is used to get free memory used for a new container
// 2-phases: first, we find idle container and collect them as a slice, second (cleanup phase) we delete the container only and only if
// the sum of their memory is >= requiredMemoryMB is
// removeContainerFromIdle removes a specific container from the idle pool.
func (fp *ContainerPool) removeContainerFromIdle(target *container.Container) bool {
for i, c := range fp.idle {
if c == target { // pointer comparison
// swap with the last element
lastIdx := len(fp.idle) - 1
fp.idle[i] = fp.idle[lastIdx]

fp.idle[lastIdx] = nil // for better garbage collection we delete the last element content before slicing

fp.idle = fp.idle[:lastIdx]
return true
}
}
return false
}

// dismissContainer attempts to free memory by dismissing idle containers.
// It works in 2 phases: research (collect candidates) and cleanup (destroy them).
// Containers are actually cleaned only if they free enough memory for the new function/container
func dismissContainer(requiredMemoryMB int64) (bool, error) {
log.Printf("Trying to dismiss containers to free up at least %d MB", requiredMemoryMB)
var cleanedMB int64 = 0
var containerToDismiss []itemToDismiss

//first phase, research
// Phase 1: Research
// We iterate through all pools to find idle containers to (potentially) remove.
for _, funPool := range LocalResources.containerPools {
if funPool.idle.Len() > 0 {
// every container into the funPool has the same memory (same function)
//so it is not important which one you destroy
elem := funPool.idle.Front()
// container in the same pool need same memory
memory, _ := container.GetMemoryMB(elem.Value.(*container.Container).ID)

for elem != nil {
contID := elem.Value.(*container.Container).ID
containerToDismiss = append(containerToDismiss, itemToDismiss{contID: contID, pool: funPool, elem: elem, memory: memory})

if len(funPool.idle) > 0 {
for _, cont := range funPool.idle {
memory, _ := container.GetMemoryMB(cont.ID)

// We collect the pointer to the container and the pool reference
containerToDismiss = append(containerToDismiss,
itemToDismiss{cont: cont, pool: funPool, memory: memory})

cleanedMB += memory
if cleanedMB >= requiredMemoryMB {
goto cleanup
}
elem = elem.Next()
}
}
}

cleanup: // second phase, cleanup
// memory check
if cleanedMB >= requiredMemoryMB {
cleanup: // Phase 2: Cleanup
if cleanedMB >= requiredMemoryMB { // if we'd actually free enough memory we do it, otherwise there's no point
for _, item := range containerToDismiss {
item.pool.idle.Remove(item.elem) // remove the container from the funPool
err := container.Destroy(item.contID) // destroy the container
if err != nil {
return false, err
if item.pool.removeContainerFromIdle(item.cont) {
// Destroy the actual container resources (Docker/Containerd)
err := container.Destroy(item.cont.ID)
if err != nil {
return false, err
}
LocalResources.warmPoolUsedMem -= item.memory
}
LocalResources.warmPoolUsedMem -= item.memory
}
return true, nil
} else {
log.Printf("Not enough containers to free up at least %d MB (avail to dismiss: %d)", requiredMemoryMB, cleanedMB)
return false, nil
return false, errors.New("not enough containers to free up memory")

}
return true, nil
}

// DeleteExpiredContainer is called by the container cleaner
Expand All @@ -311,24 +336,44 @@ func DeleteExpiredContainer() {
defer LocalResources.Unlock()

for _, pool := range LocalResources.containerPools {
elem := pool.idle.Front()
for ok := elem != nil; ok; ok = elem != nil {
warm := elem.Value.(*container.Container)
// Index to track the position of kept elements
// Basically, since we now have slices, we want to avoid modifying the slice length while we're iterating
// over it. So, we keep track of the element (containers) we want to keep, then we move them to the front of
// the slice, and finally we cut the slice.
// E.g.: if we have 8 containers, but 3 of them are expired, we cycle over the 8 containers, we put the 5 we need
// to keep from the index 0 to 4, and then we cut the slice after the fifth element.
// Finally, we "nil-out" the last 3 elements (in this example), to favor garbage collection.
kept := 0

for _, warm := range pool.idle {
if now > warm.ExpirationTime {
temp := elem
elem = elem.Next()
pool.idle.Remove(temp) // remove the expired element
// remove the expired container

// Update resources
memory, _ := container.GetMemoryMB(warm.ID)
LocalResources.warmPoolUsedMem -= memory

// Destroy the actual container
err := container.Destroy(warm.ID)
if err != nil {
log.Printf("Error while destroying container %s: %s\n", warm.ID, err)
}

} else {
elem = elem.Next()
// container is still valid: Keep it
// Rewrite the element at the 'kept' position
pool.idle[kept] = warm
kept++ // position to write the (eventual) next container we need to keep
}
}

// finally, we set as nil the references to the containers we deleted
for i := kept; i < len(pool.idle); i++ {
pool.idle[i] = nil
}

// Reslice to the new length
pool.idle = pool.idle[:kept]
}
}

Expand All @@ -343,21 +388,21 @@ func ShutdownWarmContainersFor(f *function.Function) {
return
}

containersToDelete := make([]container.ContainerID, 0)
containersToDelete := make([]container.ContainerID, 0, len(fp.idle)) // we already know how long it'll need to be, so no need for reallocation

elem := fp.idle.Front()
for ok := elem != nil; ok; ok = elem != nil {
warmed := elem.Value.(*container.Container)
temp := elem
elem = elem.Next()
// Iterate over the idle slice directly
for i, warmed := range fp.idle {
log.Printf("Removing container with ID %s\n", warmed.ID)
fp.idle.Remove(temp)

memory, _ := container.GetMemoryMB(warmed.ID)
LocalResources.warmPoolUsedMem -= memory
containersToDelete = append(containersToDelete, warmed.ID)
fp.idle[i] = nil
}

// clear the slice
fp.idle = fp.idle[:0]

go func(contIDs []container.ContainerID) {
for _, contID := range contIDs {
// No need to update available resources here
Expand All @@ -382,34 +427,38 @@ func ShutdownAllContainers() {
continue // should not happen
}

for elem := pool.idle.Front(); elem != nil; elem = elem.Next() {
warmed := elem.Value.(*container.Container)
temp := elem
for i, warmed := range pool.idle {
log.Printf("Removing container with ID %s\n", warmed.ID)
pool.idle.Remove(temp)

err := container.Destroy(warmed.ID)
if err != nil {
log.Printf("Error while destroying container %s: %s", warmed.ID, err)
}
LocalResources.warmPoolUsedMem -= functionDescriptor.MemoryMB

// nil to help garbage collection
pool.idle[i] = nil
}
// Reset the idle slice
pool.idle = pool.idle[:0]

for elem := pool.busy.Front(); elem != nil; elem = elem.Next() {
contID := elem.Value.(*container.Container).ID
temp := elem
log.Printf("Removing container with ID %s\n", contID)
pool.idle.Remove(temp)
// now we do the same but for busy containers
for i, busyCont := range pool.busy {
log.Printf("Removing container with ID %s\n", busyCont.ID)

err := container.Destroy(contID)
err := container.Destroy(busyCont.ID)
if err != nil {
log.Printf("failed to destroy container %s: %v\n", contID, err)
log.Printf("failed to destroy container %s: %v\n", busyCont.ID, err)
continue
}

LocalResources.usedCPUs -= functionDescriptor.CPUDemand
LocalResources.busyPoolUsedMem -= functionDescriptor.MemoryMB

pool.busy[i] = nil
}
// Reset the busy slice capacity
pool.busy = pool.busy[:0]
}
}

Expand All @@ -419,7 +468,7 @@ func WarmStatus() map[string]int {
defer LocalResources.RUnlock()
warmPool := make(map[string]int)
for funcName, pool := range LocalResources.containerPools {
warmPool[funcName] = pool.idle.Len()
warmPool[funcName] = len(pool.idle)
}

return warmPool
Expand Down