8000 hash lb · open-lambda/open-lambda@ac84f04 · GitHub
[go: up one dir, main page]

Skip to content

Commit ac84f04

Browse files
committed
hash lb
1 parent 89424ed commit ac84f04

File tree

5 files changed

+67
-4
lines changed

5 files changed

+67
-4
lines changed

boss.json.overrides

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
"machine_type": "e2-medium"
2121
},
2222
"lb": "random",
23-
"max_group": 2,
23+
"max_group": 5,
2424
"platform": "azure",
2525
"scaling": "manual",
2626
"tree_path": "/home/azureuser/paper-tree-cache/analysis/17/trees/tree-v2.node-640.json",

src/boss/cloudvm/worker.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,8 @@ func (pool *WorkerPool) RunLambda(w http.ResponseWriter, r *http.Request) {
520520
}
521521

522522
if loadbalancer.Lb.LbType == loadbalancer.Hash {
523-
targetGroup = loadbalancer.HashGetGroup(img)
523+
targetGroup = loadbalancer.HashGetGroup(img, len(pool.workers[RUNNING]))
524+
// fmt.Println(targetGroup)
524525
targetGroups = append(targetGroups, targetGroup)
525526
}
526527

@@ -778,6 +779,9 @@ func (pool *WorkerPool) Restart() {
778779

779780
func (pool *WorkerPool) ChangeTree(tree string) {
780781
pool.Lock()
782+
// if tree_path == tree {
783+
// return
784+
// }
781785
loadbalancer.InitLoadBalancer(loadbalancer.Lb.LbType, loadbalancer.MaxGroup, tree)
782786
pool.Unlock()
783787

@@ -789,18 +793,33 @@ func (pool *WorkerPool) ChangeTree(tree string) {
789793
func (pool *WorkerPool) ChangePolicy(policy string) {
790794
pool.Lock()
791795
if policy == "random" {
796+
// if loadbalancer.Lb.LbType == loadbalancer.Random {
797+
// return
798+
// }
792799
loadbalancer.InitLoadBalancer(loadbalancer.Random, loadbalancer.MaxGroup, tree_path)
793800
}
794801
if policy == "sharding" {
802+
// if loadbalancer.Lb.LbType == loadbalancer.Sharding {
803+
// return
804+
// }
795805
loadbalancer.InitLoadBalancer(loadbalancer.Sharding, loadbalancer.MaxGroup, tree_path)
796806
}
797807
if policy == "kmeans" {
808+
// if loadbalancer.Lb.LbType == loadbalancer.KMeans {
809+
// return
810+
// }
798811
loadbalancer.InitLoadBalancer(loadbalancer.KMeans, loadbalancer.MaxGroup, tree_path)
799812
}
800813
if policy == "kmodes" {
814+
// if loadbalancer.Lb.LbType == loadbalancer.KModes {
815+
// return
816+
// }
801817
loadbalancer.InitLoadBalancer(loadbalancer.KModes, loadbalancer.MaxGroup, tree_path)
802818
}
803819
if policy == "hash" {
820+
// if loadbalancer.Lb.LbType == loadbalancer.Hash {
821+
// return
822+
// }
804823
loadbalancer.InitLoadBalancer(loadbalancer.Hash, loadbalancer.MaxGroup, tree_path)
805824
}
806825
pool.Unlock()
@@ -812,14 +831,17 @@ func (pool *WorkerPool) ChangePolicy(policy string) {
812831

813832
// forward request to worker
814833
// TODO: this is kept for other platforms
834+
var client = &http.Client{
835+
Timeout: time.Second * 10,
836+
}
837+
815838
func forwardTaskHelper(w http.ResponseWriter, req *http.Request, workerIp string) error {
816839
host := fmt.Sprintf("%s:%d", workerIp, 5000) //TODO: read from config
817840
req.URL.Scheme = "http"
818841
req.URL.Host = host
819842
req.Host = host
820843
req.RequestURI = ""
821844

822-
client := http.Client{}
823845
resp, err := client.Do(req)
824846
if err != nil {
825847
http.Error(w, err.Error(), http.StatusBadGateway)

src/boss/loadbalancer/hash.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package loadbalancer
2+
3+
import (
4+
"crypto/sha256"
5+
"hash"
6+
"math/big"
7+
"sync"
8+
)
9+
10+
var (
11+
hasher hash.Hash
12+
hasherMutex sync.Mutex
13+
)
14+
15+
func hashString(input string) int {
16+
hasherMutex.Lock() // Lock the mutex before using the hasher
17+
defer hasherMutex.Unlock() // Unlock the mutex when the function exits
18+
19+
hasher.Reset()
20+
hasher.Write([]byte(input))
21+
hashBytes := hasher.Sum(nil)
22+
23+
bigIntHash := new(big.Int).SetBytes(hashBytes).Int64()
24+
if bigIntHash < 0 {
25+
bigIntHash = -bigIntHash
26+
}
27+
return int(bigIntHash)
28+
}
29+
30+
func HashGetGroup(img string, running int) int {
31+
hashInt := hashString(img)
32+
group := hashInt % running
33+
return group
34+
}
35+
36+
func initHasher() {
37+
hasher = sha256.New()
38+
}

src/common/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func LoadDefaults(olPath string) error {
148148
}
149149
// totalMb := uint64(in.Totalram) * uint64(in.Unit) / 1024 / 1024
150150
// memPoolMb := Max(int(totalMb-500), 500)
151-
memPoolMb := 25600
151+
memPoolMb := 23000
152152

153153
Conf = &Config{
154154
Worker_dir: workerDir,

src/worker/server/lambdaServer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ func (s *LambdaServer) RunLambda(w http.ResponseWriter, r *http.Request) {
5959
if len(urlParts) == 2 {
6060
img := urlParts[1]
6161
s.lambdaMgr.Get(img).Invoke(w, r)
62+
// time.Sleep(3 * time.Millisecond)
63+
// w.WriteHeader(http.StatusOK)
64+
// w.Write([]byte("Place Holder, testing boss throughput limit"))
6265
} else {
6366
w.WriteHeader(http.StatusInternalServerError)
6467
w.Write([]byte("expected invocation format: /run/<lambda-name>"))

0 commit comments

Comments
 (0)
0