8000 Release/v1.88.0 by lx1036 · Pull Request #258 · lx1036/code · GitHub
[go: up one dir, main page]

Skip to content

Release/v1.88.0 #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
(cni)boltdb local pods
  • Loading branch information
lx1036 committed May 18, 2022
commit b1208dd260c643774dd81b4718e4469b49ff2efe
21 changes: 18 additions & 3 deletions go/k8s/network/cni/cni-on-vm/pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (server *EniBackendServer) AllocateIP(ctx context.Context, request *rpc.All
defer server.RUnlock()

// 0. Get pod Info
podInfo, err := server.k8s.GetPod(r.K8SPodNamespace, r.K8SPodName)
podInfo, err := server.k8sService.GetPod(request.K8SPodNamespace, request.K8SPodName)

// 1. Init Context
allocIPReply := &rpc.AllocateIPReply{
Expand Down Expand Up @@ -171,7 +171,7 @@ func (server *EniBackendServer) ReleaseIP(ctx context.Context, request *rpc.Rele
defer server.RUnlock()

// 0. Get pod Info
podInfo, err := server.k8s.GetPod(r.K8SPodNamespace, r.K8SPodName)
podInfo, err := server.k8sService.GetPod(request.K8SPodNamespace, request.K8SPodName)

releaseReply := &rpc.ReleaseIPReply{
Success: true,
Expand All @@ -181,7 +181,22 @@ func (server *EniBackendServer) ReleaseIP(ctx context.Context, request *rpc.Rele
}

func (server *EniBackendServer) GetIPInfo(ctx context.Context, request *rpc.GetInfoRequest) (*rpc.GetInfoReply, error) {
panic("implement me")

// 0. Get pod Info
podinfo, err := server.k8sService.GetPod(request.K8SPodNamespace, request.K8SPodName)
if err != nil {
return nil, errors.Wrapf(err, "error get pod info for: %+v", r)
}

getIPInfoResult := &rpc.GetInfoReply{IPv4: server.ipFamily.IPv4, IPv6: server.ipFamily.IPv6}

var netConf []*rpc.NetConf
// 2. return network info for pod
switch podinfo.PodNetworkType {
case podNetworkTypeENIMultiIP:

}

}

func (server *EniBackendServer) RecordEvent(ctx context.Context, request *rpc.EventRequest) (*rpc.EventReply, error) {
Expand Down
42 changes: 41 additions & 1 deletion go/k8s/network/cni/cni-on-vm/pkg/daemon/k8s.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package daemon

import (
"context"
"fmt"

"k8s-lx1036/k8s/network/cni/cni-on-vm/pkg/utils/storage"
"k8s-lx1036/k8s/network/cni/cni-on-vm/pkg/utils/types"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

Expand All @@ -10,6 +17,9 @@ import (

const (
podNetworkTypeENIMultiIP = "ENIMultiIP"

dbPath = "/var/lib/cni/pod.db"
dbName = "pods"
)

func podInfoKey(namespace, name string) string {
Expand All @@ -18,6 +28,8 @@ func podInfoKey(namespace, name string) string {

type K8sService struct {
client kubernetes.Interface

storage storage.DiskStorage
}

func newK8sServiceOrDie(kubeconfig string, daemonMode string) *K8sService {
Expand All @@ -30,9 +42,37 @@ func newK8sServiceOrDie(kubeconfig string, daemonMode string) *K8sService {
klog.Fatal(err)
}

s, err := storage.NewDiskStorage(dbName, dbPath)
if err != nil {
klog.Fatal(err)
}

k8sService := &K8sService{
client: client,
client: client,
storage: s,
}

return k8sService
}

func (k8sService *K8sService) GetPod(namespace, name string) (*types.PodInfo, error) {

pod, err := k8sService.client.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) { // fetch from local boltdb
key := podInfoKey(namespace, name)
obj, err := k8sService.storage.Get(key)
if err == nil {
item := obj.(*storage.Item)
return item.Pod, nil
}

if err != storage.ErrNotFound {
return nil, err
}
}

return nil, err
}

}
161 changes: 161 additions & 0 deletions go/k8s/network/cni/cni-on-vm/pkg/utils/storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package storage

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"

"k8s-lx1036/k8s/network/cni/cni-on-vm/pkg/utils/types"

bolt "go.etcd.io/bbolt"
)

var (
ErrNotFound = fmt.Errorf("not found")
)

type Item struct {
Pod *types.PodInfo
deletionTime *time.Time
}

type MemoryStorage struct {
sync.RWMutex

store map[string]*Item
}

func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{
store: make(map[string]*Item),
}
}

func (s *MemoryStorage) Put(key string, value *Item) error {
s.Lock()
defer s.Unlock()

s.store[key] = value
return nil
}

func (s *MemoryStorage) Get(key string) (*Item, error) {
s.RLock()
defer s.RUnlock()

value, ok := s.store[key]
if !ok {
return nil, ErrNotFound
}
return value, nil
}

func (s *MemoryStorage) List() ([]*Item, error) {
s.RLock()
defer s.RUnlock()

var items []*Item
for _, item := range s.store {
items = append(items, item)
}

return items, nil
}

func (s *MemoryStorage) Delete(key string) error {
s.Lock()
defer s.Unlock()

delete(s.store, key)
return nil
}

type DiskStorage struct {
name string
db *bolt.DB
memory *MemoryStorage
}

func NewDiskStorage(name string, path string) (Storage, error) {
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return nil, err
}

db, err := bolt.Open(path, 0600, nil)
if err != nil {
return nil, err
}

s := &DiskStorage{
db: db,
name: name,
memory: NewMemoryStorage(),
}
if err = s.load(); err != nil {
return nil, err
}

return s, nil
}

// load from disk into memory
func (s *DiskStorage) load() error {
err := s.db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(s.name))
return err
})
if err != nil {
return err
}

return s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(s.name))
return bucket.ForEach(func(k, v []byte) error {
var item Item
if err = json.Unmarshal(v, &item); err != nil {
return err
}
return s.memory.Put(string(k), &item)
})
})
}

func (s *DiskStorage) Put(key string, value *Item) error {
data, err := json.Marshal(value)
if err != nil {
return err
}

err = s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(s.name))
return bucket.Put([]byte(key), data)
})
if err != nil {
return err
}

return s.memory.Put(key, value)
}

func (s *DiskStorage) Get(key string) (*Item, error) {
return s.memory.Get(key)
}

func (s *DiskStorage) List() ([]*Item, error) {
return s.memory.List()
}

func (s *DiskStorage) Delete(key string) error {
err := s.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(s.name))
return bucket.Delete([]byte(key))
})
if err != nil {
return err
}

return s.memory.Delete(key)
}
5 changes: 5 additions & 0 deletions go/k8s/network/cni/cni-on-vm/pkg/utils/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,8 @@ const (
VlanStripTypeFilter = "filter"
VlanStripTypeVlan = "vlan"
)

type IPFamily struct {
IPv4 bool
IPv6 bool
}
0