8000 change worker mem, hashFun,c hashZygote · open-lambda/open-lambda@f806b7e · GitHub
[go: up one dir, main page]

Skip to content

Commit f806b7e

Browse files
committed
change worker mem, hashFun,c hashZygote
1 parent fdaed77 commit f806b7e

File tree

10 files changed

+139
-24
lines changed

10 files changed

+139
-24
lines changed

src/boss/boss.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const (
2323
RESTART_PATH = "/restart"
2424
CHANGE_LB_PATH = "/change_lb"
2525
CHANGE_TREE_PATH = "/change_tree"
26+
CHANGE_MEM_PATH = "/change_mem"
2627
)
2728

2829
type Boss struct {
@@ -128,6 +129,43 @@ func (b *Boss) ChangeTree(w http.ResponseWriter, r *http.Request) {
128129
b.workerPool.ChangeTree(new_tree)
129130
}
130131

132+
func (b *Boss) ChangeMem(w http.ResponseWriter, r *http.Request) {
133+
if r.Method != "POST" {
134+
w.WriteHeader(http.StatusMethodNotAllowed)
135+
_, err := w.Write([]byte("POST a policy to /change_lb\n"))
136+
if err != nil {
137+
log.Printf("(1) could not write web response: %s\n", err.Error())
138+
}
139+
return
140+
}
141+
142+
contents, err := io.ReadAll(r.Body)
143+
if err != nil {
144+
w.WriteHeader(http.StatusInternalServerError)
145+
_, err := w.Write([]byte("could not read body of web request\n"))
146+
if err != nil {
147+
log.Printf("(2) could not write web response: %s\n", err.Error())
148+
}
149+
return
150+
}
151+
152+
newMemStr := string(contents)
153+
newMem, err := strconv.Atoi(newMemStr)
154+
if err != nil {
155+
w.WriteHeader(http.StatusBadRequest)
156+
_, err := w.Write([]byte("invalid integer value\n"))
157+
if err != nil {
158+
log.Printf("(3) could not write web response: %s\n", err.Error())
159+
}
160+
return
161+
}
162+
163+
Conf.Worker_mem = newMem
164+
165+
b.workerPool.ChangeMem(newMem)
166+
b.BossStatus(w, r)
167+
}
168+
131169
func (b *Boss) RestartWorkers(w http.ResponseWriter, r *http.Request) {
132170
b.workerPool.Restart()
133171
b.BossStatus(w, r)
@@ -193,6 +231,7 @@ func BossMain() (err error) {
193231
http.HandleFunc(RESTART_PATH, boss.RestartWorkers)
194232
http.HandleFunc(CHANGE_LB_PATH, boss.ChangeLb)
195233
http.HandleFunc(CHANGE_TREE_PATH, boss.ChangeTree)
234+
http.HandleFunc(CHANGE_MEM_PATH, boss.ChangeMem)
196235

197236
// clean up if signal hits us
198237
c := make(chan os.Signal, 1)

src/boss/cloudvm/api.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ const (
2626
snapshot = "ol-boss-new-snapshot"
2727
)
2828

29-
var tree_path string
30-
3129
const (
3230
test_path = "/home/azureuser/paper-tree-cache/analysis/17/"
3331
ssh_key_path = "/home/azureuser/.ssh/ol-boss_key.pem"

src/boss/cloudvm/azure_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (worker *Worker) start(firstTime bool) error {
142142
run_one_time := "sudo python3 run_worker.py"
143143

144144
var run_worker_up string
145-
run_worker_up = fmt.Sprintf("sudo ./ol worker up -i ol-min -d -o import_cache_tree=%s,worker_url=0.0.0.0,features.warmup=false,limits.mem_mb=500,mem_pool_mb=32768,trace.evictor=true", tree_path)
145+
run_worker_up = fmt.Sprintf("sudo ./ol worker up -i ol-min -d -o import_cache_tree=%s,worker_url=0.0.0.0,features.warmup=false,limits.mem_mb=500,mem_pool_mb=%d,trace.evictor=true", tree_path, worker_mem)
146146

147147
var cmd string
148148
if firstTime {

src/boss/cloudvm/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414

1515
var GcpConf *GcpConfig
1616
var AzureConf *AzureConfig
17+
var tree_path string
18+
var worker_mem int
1719

1820
type GcpConfig struct {
1921
DiskSizeGb int `json:"disk_size_gb"`
@@ -24,6 +26,10 @@ func LoadTreePath(path string) {
2426
tree_path = path
2527
}
2628

29+
func LoadWorkerMem(mem int) {
30+
worker_mem = mem
31+
}
32+
2733
func GetGcpConfigDefaults() *GcpConfig {
2834
return &GcpConfig{
2935
DiskSizeGb: 30,

src/boss/cloudvm/worker.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -522,8 +522,14 @@ func (pool *WorkerPool) RunLambda(w http.ResponseWriter, r *http.Request) {
522522
}
523523
}
524524

525-
if loadbalancer.Lb.LbType == loadbalancer.Hash {
526-
targetGroup = loadbalancer.HashGetGroup(pkgs, len(pool.workers[RUNNING]))
525+
if loadbalancer.Lb.LbType == loadbalancer.HashFunc {
526+
targetGroup = loadbalancer.HashFuncGetGroup(img, len(pool.workers[RUNNING]))
527+
// fmt.Println(targetGroup)
528+
targetGroups = append(targetGroups, targetGroup)
529+
}
530+
531+
if loadbalancer.Lb.LbType == loadbalancer.HashZygote {
532+
targetGroup = loadbalancer.HashZygoteGetGroup(pkgs, len(pool.workers[RUNNING]))
527533
// fmt.Println(targetGroup)
528534
targetGroups = append(targetGroups, targetGroup)
529535
}
@@ -603,7 +609,7 @@ func (pool *WorkerPool) RunLambda(w http.ResponseWriter, r *http.Request) {
603609
smallWorker = curWorker
604610
}
605611
}
606-
if smallWorkerTask < (worker.numTask - 32) {
612+
if smallWorkerTask < (worker.numTask - 5) {
607613
worker = smallWorker
608614
}
609615

@@ -793,6 +799,16 @@ func (pool *WorkerPool) ChangeTree(tree string) {
793799
pool.updateCluster()
794800
}
795801

802+
func (pool *WorkerPool) ChangeMem(mem int) {
803+
pool.Lock()
804+
LoadWorkerMem(mem)
805+
pool.Unlock()
806+
807+
pool.Restart()
808+
809+
pool.updateCluster()
810+
}
811+
796812
func (pool *WorkerPool) ChangePolicy(policy string) {
797813
pool.Lock()
798814
if policy == "random" {
@@ -819,11 +835,17 @@ func (pool *WorkerPool) ChangePolicy(policy string) {
819835
// }
820836
loadbalancer.InitLoadBalancer(loadbalancer.KModes, loadbalancer.MaxGroup, tree_path)
821837
}
822-
if policy == "hash" {
838+
if policy == "hashfunc" {
839+
// if loadbalancer.Lb.LbType == loadbalancer.Hash {
840+
// return
841+
// }
842+
loadbalancer.InitLoadBalancer(loadbalancer.HashFunc, loadbalancer.MaxGroup, tree_path)
843+
}
844+
if policy == "hashzygote" {
823845
// if loadbalancer.Lb.LbType == loadbalancer.Hash {
824846
// return
825847
// }
826-
loadbalancer.InitLoadBalancer(loadbalancer.Hash, loadbalancer.MaxGroup, tree_path)
848+
loadbalancer.InitLoadBalancer(loadbalancer.HashZygote, loadbalancer.MaxGroup, tree_path)
827849
}
828850
pool.Unlock()
829851

src/boss/config.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Config struct {
2424
Lb string `json:"lb"`
2525
MaxGroup int `json:"max_group"`
2626
Tree_path string `json:"tree_path"`
27+
Worker_mem int `json:"worker_mem"`
2728
}
2829

2930
func LoadDefaults() error {
@@ -33,6 +34,7 @@ func LoadDefaults() error {
3334
return err
3435
}
3536
tree_path := fmt.Sprintf("%s/default-zygote-40.json", olPath)
37+
3638
Conf = &Config{
3739
Platform: "mock",
3840
Scaling: "manual",
@@ -44,11 +46,20 @@ func LoadDefaults() error {
4446
Lb: "random",
4547
MaxGroup: 5,
4648
Tree_path: tree_path,
49+
Worker_mem: 32768,
4750
}
4851

4952
return checkConf()
5053
}
5154

55+
func Max(x int, y int) int {
56+
if x > y {
57+
return x
58+
}
59+
60+
return y
61+
}
62+
5263
// ParseConfig reads a file and tries to parse it as a JSON string to a Config
5364
// instance.
5465
func LoadConf(path string) error {
@@ -63,6 +74,7 @@ func LoadConf(path string) error {
6374
}
6475

6576
cloudvm.LoadTreePath(Conf.Tree_path)
77+
cloudvm.LoadWorkerMem(Conf.Worker_mem)
6678
if Conf.Platform == "gcp" {
6779
cloudvm.LoadGcpConfig(&Conf.Gcp)
6880
} else if Conf.Platform == "azure" {
@@ -81,8 +93,11 @@ func LoadConf(path string) error {
8193
if Conf.Lb == "kmodes" {
8294
loadbalancer.InitLoadBalancer(loadbalancer.KModes, Conf.MaxGroup, Conf.Tree_path)
8395
}
84-
if Conf.Lb == "hash" {
85-
loadbalancer.InitLoadBalancer(loadbalancer.Hash, Conf.MaxGroup, Conf.Tree_path)
96+
if Conf.Lb == "hashfunc" {
97+
loadbalancer.InitLoadBalancer(loadbalancer.HashFunc, Conf.MaxGroup, Conf.Tree_path)
98+
}
99+
if Conf.Lb == "hashzygote" {
100+
loadbalancer.InitLoadBalancer(loadbalancer.HashZygote, Conf.MaxGroup, Conf.Tree_path)
86101
}
87102

88103
return checkConf()
@@ -92,8 +107,8 @@ func checkConf() error {
92107
if Conf.Scaling != "manual" && Conf.Scaling != "threshold-scaler" {
93108
return fmt.Errorf("Scaling type '%s' not implemented", Conf.Scaling)
94109
}
95-
if Conf.Lb != "random" && Conf.Lb != "sharding" && Conf.Lb != "kmeans" && Conf.Lb != "kmodes" && Conf.Lb != "hash" {
96-
return fmt.Errorf("%s is not implemented", Conf.Scaling)
110+
if Conf.Lb != "random" && Conf.Lb != "sharding" && Conf.Lb != "kmeans" && Conf.Lb != "kmodes" && Conf.Lb != "hashfunc" && Conf.Lb != "hashzygote" {
111+
return fmt.Errorf("%s is not implemented", Conf.Lb)
97112
}
98113

99114
return nil

src/boss/loadbalancer/config.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ import (
99
)
1010

1111
const (
12-
Random = 0
13-
KMeans = 1
14-
KModes = 2
15-
Sharding = 3
16-
Hash = 4
12+
Random = 0
13+
KMeans = 1
14+
KModes = 2
15+
Sharding = 3
16+
HashZygote = 4
17+
HashFunc = 5
1718
)
1819

1920
var tree_path string
@@ -73,9 +74,12 @@ func InitLoadBalancer(lbType int, maxGroup int, path string) {
7374
log.Fatalf(err.Error())
7475
}
7576
}
76-
if lbType == Hash {
77+
if lbType == HashZygote {
7778
GetRoot()
78-
initHasher()
79+
initZygoteHasher()
80+
}
81+
if lbType == HashFunc {
82+
initFuncHasher()
7983
}
8084
}
8185
Lb = &LoadBalancer{

src/boss/loadbalancer/hashFunc.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package loadbalancer
2+
3+
import (
4+
"crypto/sha256"
5+
"math/big"
6+
)
7+
8+
func hashString(input string) int {
9+
hasherMutex.Lock() // Lock the mutex before using the hasher
10+
defer hasherMutex.Unlock() // Unlock the mutex when the function exits
11+
12+
hasher.Reset()
13+
hasher.Write([]byte(input))
14+
hashBytes := hasher.Sum(nil)
15+
16+
bigIntHash := new(big.Int).SetBytes(hashBytes).Int64()
17+
if bigIntHash < 0 {
18+
bigIntHash = -bigIntHash
19+
}
20+
return int(bigIntHash)
21+
}
22+
23+
func HashFuncGetGroup(img string, running int) int {
24+
hashInt := hashString(img)
25+
group := hashInt % running
26+
return group
27+
}
28+
29+
func initFuncHasher() {
30+
hasher = sha256.New()
31+
}

src/boss/loadbalancer/hash.go renamed to src/boss/loadbalancer/hashZygote.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ var (
1212
hasherMutex sync.Mutex
1313
)
1414

15-
func hashString(input int) int {
15+
func hashInt(input int) int {
1616
hasherMutex.Lock() // Lock the mutex before using the hasher
1717
defer hasherMutex.Unlock() // Unlock the mutex when the function exits
1818

@@ -34,13 +34,13 @@ func hashString(input int) int {
3434
return truncatedHash
3535
}
3636

37-
func HashGetGroup(pkgs []string, running int) int {
37+
func HashZygoteGetGroup(pkgs []string, running int) int {
3838
node := root.Lookup(pkgs)
39-
hashInt := hashString(node.SplitGeneration)
39+
hashInt := hashInt(node.SplitGeneration)
4040
group := hashInt % running
4141
return group
4242
}
4343

44-
func initHasher() {
44+
func initZygoteHasher() {
4545
hasher = sha256.New()
4646
}

src/worker/sandbox/evictors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (evictor *SOCKEvictor) doEvictions() {
239239
if freeSandboxes <= 0 && evictor.evicting.Len() == 0 {
240240
evictor.printf("WARNING! Critically low on memory, so evicting an active Sandbox")
241241
if evictor.prioQueues[1].Len() > 0 {
242-
// evictor.evictFront(evictor.prioQueues[1], true)
242+
//evictor.evictFront(evictor.prioQueues[1], true)
243243
}
244244
}
245245

0 commit comments

Comments
 (0)
0