8000 Release/v1.88.0 by lx1036 · Pull Request #258 · lx1036/code · GitHub
[go: up one dir, main page]

Skip to content

Release/v1.88.0 #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
(cni)save map[podInfo]ENI relationship to boltdb
  • Loading branch information
lx1036 committed May 25, 2022
commit 9e888237f471d7341aad66d2a1bafb1e8ca22ad9
85 changes: 83 additions & 2 deletions go/k8s/network/cni/cni-on-vm/pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,30 @@ package daemon

import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"k8s-lx1036/k8s/network/cni/cni-on-vm/pkg/ipam"
"k8s-lx1036/k8s/network/cni/cni-on-vm/pkg/utils/storage"
"k8s.io/klog/v2"
"os"
"sync"

"k8s-lx1036/k8s/network/cni/cni-on-vm/pkg/rpc"
"k8s-lx1036/k8s/network/cni/cni-on-vm/pkg/utils/types"
)

// TODO: 把 map[PodInfo]ENI 信息存入 pods boltdb 中!!!

const (
daemonModeVPC = "VPC"
daemonModeENIMultiIP = "ENIMultiIP"
daemonModeENIOnly = "ENIOnly"

cniDefaultPath = "/opt/cni/bin"

dbPath = "/var/lib/cni/pods.db"
dbName = "pods"
)

// ResourceManager Allocate/Release/Pool/Stick/GC pod resource
Expand Down Expand Up @@ -49,7 +58,8 @@ type EniBackendServer struct {
ipFamily *types.IPFamily

pendingPods sync.Map // 并发安全的 map

storage *storage.DiskStorage
ipam ipam.API
}

func newEniBackendServer(daemonMode, configFilePath, kubeconfig string) (rpc.EniBackendServer, error) {
Expand All @@ -72,6 +82,32 @@ func newEniBackendServer(daemonMode, configFilePath, kubeconfig string) (rpc.Eni
return nil, err
}

server.storage, err = storage.NewDiskStorage(dbName, dbPath, json.Marshal, func(bytes []byte) (interface{}, error) {
var podResource types.PodResources
if err := json.Unmarshal(bytes, &podResource); err != nil {
return nil, err
}
return podResource, nil
})
if err != nil {
return nil, err
}
server.restoreLocalENI()
eniList, err := server.storage.List()
if err != nil {
return nil, err
}
localResource := make(map[string]types.PodResource)
for _, obj := range eniList {
podResource := obj.(types.PodResources)
for _, resource := range podResource.Resources {
localResource[resource.ID] = types.PodResource{
Resource: resource,
PodInfo: podResource.PodInfo,
}
}
}

server.enableTrunk = daemonConfig.EnableENITrunking

switch daemonMode {
Expand All @@ -84,7 +120,7 @@ func newEniBackendServer(daemonMode, configFilePath, kubeconfig string) (rpc.Eni
var err error
switch daemonMode {
case daemonModeENIMultiIP:
server.eniIPResMgr, err = newENIIPResourceManager(poolConfig, ecs, server.k8s, localResource[types.ResourceTypeENI])
server.eniIPResMgr, err = newENIIPResourceManager(poolConfig, ecs, server.k8s, localResource)
if err != nil {
return nil, err
}
Expand All @@ -94,6 +130,41 @@ func newEniBackendServer(daemonMode, configFilePath, kubeconfig string) (rpc.Eni
return server, nil
}

// restore if local is empty
func (server *EniBackendServer) restoreLocalENI() error {
items, err := server.storage.List()
if err != nil {
return err
}
if len(items) != 0 {
klog.Warningf(fmt.Sprintf("local db is not empty, skip restore"))
return nil
}

eniList, err := server.ipam.GetAttachedENIs(context.TODO(), false)
if err != nil {
return err
}
ipENIMap := make(map[string]*types.ENI)
for _, eni := range eniList {
ipENIMap[eni.PrimaryIP.IPv4.String()] = eni
}

podsInfo, err := server.k8sService.GetLocalPods()
if err != nil {
return err
}
for _, podInfo := range podsInfo {
if eni, ok := ipENIMap[podInfo.PodIPs.IPv4.String()]; ok {
server.storage.Put(podInfoKey(podInfo.Namespace, podInfo.Name), types.PodResources{
PodInfo: podInfo,
Resources: eni.ToResItems(),
})
}
}

}

func (server *EniBackendServer) AllocateIP(ctx context.Context, request *rpc.AllocateIPRequest) (*rpc.AllocateIPReply, error) {

server.RLock()
Expand Down Expand Up @@ -178,6 +249,12 @@ func (server *EniBackendServer) ReleaseIP(ctx context.Context, request *rpc.Rele
IPv4: true,
}

podKey := fmt.Sprintf("%s/%s")
oldRes, err := server.getPodResource(podInfoKey(request.K8SPodNamespace, request.K8SPodName))
if err != nil {
return nil, err
}

}

func (server *EniBackendServer) GetIPInfo(ctx context.Context, request *rpc.GetInfoRequest) (*rpc.GetInfoReply, error) {
Expand Down Expand Up @@ -250,6 +327,10 @@ func (server *EniBackendServer) RecordEvent(ctx context.Context, request *rpc.Ev
panic("implement me")
}

func (server *EniBackendServer) getPodResource(key string) (types.PodResources, error) {

}

func defaultForNetConf(netConf []*rpc.NetConf) error {
// ignore netConf check
if len(netConf) == 0 {
Expand Down
26 changes: 23 additions & 3 deletions go/k8s/network/cni/cni-on-vm/pkg/daemon/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/kubernetes/pkg/util/bandwidth"
"strconv"
"time"
Expand All @@ -19,11 +20,17 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

// TODO: K8sService 里无需 db 保存 PodInfo

// INFO: 这里把 Pod -> convertPod() -> PodInfo 然后存入 pods boltdb 中,这样在处理 delete pod 时(无法通过k8sClient获取Pod)可以获取到 PodInfo,
// 为此,还需要异步清理 pods boltdb.
// 处理 delete pod 时从db中获取到 PodInfo,

const (
podNetworkTypeENIMultiIP = "ENIMultiIP"

dbPath = "/var/lib/cni/pod.db"
dbName = "pods"
//dbPath = "/var/lib/cni/pod.db"
//dbName = "pods"

defaultStickTimeForSts = 5 * time.Minute

Expand All @@ -43,7 +50,8 @@ func podInfoKey(namespace, name string) string {
}

type K8sService struct {
client kubernetes.Interface
nodeName string
client kubernetes.Interface

storage storage.DiskStorage

Expand Down Expand Up @@ -100,6 +108,18 @@ func (k8sService *K8sService) GetPod(namespace, name string) (*types.PodInfo, er
return podInfo, nil
}

func (k8sService *K8sService) GetLocalPods() ([]*types.PodInfo, error) {
podList, err := k8sService.client.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", k8sService.nodeName).String(),
})
if err != nil {

}
for _, item := range podList.Items {
convertPod()
}
}

func convertPod(daemonMode string, pod *corev1.Pod) *types.PodInfo {
podInfo := &types.PodInfo{
Name: pod.Name,
Expand Down
49 changes: 28 additions & 21 deletions go/k8s/network/cni/cni-on-vm/pkg/utils/storage/storage.go
< 10000 td id="diff-dc4f8a906b8f30f40a6591f8c79e673832d373fe868f800e225d373f77a5788dL42" data-line-number="42" class="blob-num blob-num-context js-linkable-line-number">
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package storage

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand All @@ -25,24 +24,24 @@ type Item struct {
type MemoryStorage struct {
sync.RWMutex

store map[string]*Item
store map[string]interface{}
}

func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{
store: make(map[string]*Item),
store: make(map[string]interface{}),
}
}

func (s *MemoryStorage) Put(key string, value *Item) error {
func (s *MemoryStorage) Put(key string, value interface{}) error {
s.Lock()
defer s.Unlock()

s.store[key] = value
return nil
}

func (s *MemoryStorage) Get(key string) (*Item, error) {
func (s *MemoryStorage) Get(key string) (interface{}, error) {
s.RLock()
defer s.RUnlock()

Expand All @@ -53,11 +52,11 @@ func (s *MemoryStorage) Get(key string) (*Item, error) {
return value, nil
}

func (s *MemoryStorage) List() ([]*Item, error) {
func (s *MemoryStorage) List() ([]interface{}, error) {
s.RLock()
defer s.RUnlock()

var items []*Item
var items []interface{}
for _, item := range s.store {
items = append(items, item)
}
Expand All @@ -73,13 +72,18 @@ func (s *MemoryStorage) Delete(key string) error {
return nil
}

type Serializer func(interface{}) ([]byte, error)
type Deserializer func([]byte) (interface{}, error)

type DiskStorage struct {
name string
db *bolt.DB
memory *MemoryStorage
name string
db *bolt.DB
memory *MemoryStorage
serializer Serializer
deserializer Deserializer
}

func NewDiskStorage(name string, path string) (Storage, error) {
func NewDiskStorage(name string, path string, serializer Serializer, deserializer Deserializer) (*DiskStorage, error) {
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
return nil, err
}
Expand All @@ -90,9 +94,11 @@ func NewDiskStorage(name string, path string) (Storage, error) {
}

s := &DiskStorage{
db: db,
name: name,
memory: NewMemoryStorage(),
db: db,
name: name,
memory: NewMemoryStorage(),
serializer: serializer,
deserializer: deserializer,
}
if err = s.load(); err != nil {
return nil, err
Expand All @@ -114,17 +120,18 @@ func (s *DiskStorage) load() error {
return s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(s.name))
return bucket.ForEach(func(k, v []byte) error {
var item Item
if err = json.Unmarshal(v, &item); err != nil {
obj, err := s.deserializer(v)
if err != nil {
return err
}
return s.memory.Put(string(k), &item)

return s.memory.Put(string(k), obj)
})
})
}

func (s *DiskStorage) Put(key string, value *Item) error {
data, err := json.Marshal(value)
func (s *DiskStorage) Put(key string, value interface{}) error {
data, err := s.serializer(value)
if err != nil {
return err
}
Expand All @@ -140,11 +147,11 @@ func (s *DiskStorage) Put(key string, value *Item) error {
return s.memory.Put(key, value)
}

func (s *DiskStorage) Get(key string) (*Item, error) {
func (s *DiskStorage) Get(key string) (interface{}, error) {
return s.memory.Get(key)
}

func (s *DiskStorage) List() ([]*Item, error) {
func (s *DiskStorage) List() ([]interface{}, error) {
return s.memory.List()
}

Expand Down
31 changes: 31 additions & 0 deletions go/k8s/network/cni/cni-on-vm/pkg/utils/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func (i *IPSet) SetIP(str string) *IPSet {
return i
}

func (i *IPSet) GetIPv4() string {
return i.IPv4.String()
}

type ENI struct {
ID string
MAC string
Expand All @@ -105,6 +109,23 @@ func (e *ENI) GetResourceID() string {
return e.MAC
}

func (e *ENI) GetType() string {
return ResourceTypeENI
}

func (e *ENI) ToResItems() []ResourceItem {
return []ResourceItem{
{
Type: e.GetType(),
ID: e.GetResourceID(),
ENIID: e.ID,
ENIMAC: e.MAC,
IPv4: e.PrimaryIP.GetIPv4(),
//IPv6: e.PrimaryIP.GetIPv6(),
},
}
}

type ENIIP struct {
ENI *ENI
IPSet IPSet
Expand Down Expand Up @@ -234,6 +255,16 @@ func PodInfoKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

type PodResources struct {
Resources []ResourceItem
PodInfo *PodInfo
}

type PodResource struct {
Resource ResourceItem
PodInfo *PodInfo
}

type SetupConfig struct {
HostVethIfName string
HostIPSet *IPNetSet
Expand Down
0