@@ -489,13 +489,15 @@ func (s *Syncer) applyDerived(
489
489
local := svc .localCount
490
490
491
491
skey = getSvcKey (sname , getSvcKeyExtra (t , sinfo .ClusterIP ().String ()))
492
+ flags := uint32 (0 )
493
+
492
494
switch t {
493
- case svcTypeLoadBalancer :
494
- // Handle LB services the same as NodePort type.
495
- fallthrough
496
- case svcTypeNodePort :
495
+ case svcTypeNodePort , svcTypeLoadBalancer , svcTypeNodePortRemote :
497
496
if sinfo .NodeLocalExternal () {
498
- count = local // use only local eps
497
+ flags |= nat .NATFlgExternalLocal
498
+ }
499
+ if sinfo .NodeLocalInternal () {
500
+ flags |= nat .NATFlgInternalLocal
499
501
}
500
502
}
501
503
@@ -506,11 +508,11 @@ func (s *Syncer) applyDerived(
506
508
svc : sinfo ,
507
509
}
508
510
509
- if err := s .writeSvc (sinfo , svc .id , count , local ); err != nil {
511
+ if err := s .writeSvc (sinfo , svc .id , count , local , flags ); err != nil {
510
512
return err
511
513
}
512
514
if svcTypeLoadBalancer == t || svcTypeExternalIP == t {
513
- err := s .writeLBSrcRangeSvcNATKeys (sinfo , svc .id , count , local )
515
+ err := s .writeLBSrcRangeSvcNATKeys (sinfo , svc .id , count , local , flags )
514
516
if err != nil {
515
517
log .Debug ("Failed to write LB source range NAT keys" )
516
518
}
@@ -587,7 +589,7 @@ func (s *Syncer) apply(state DPSyncerState) error {
587
589
npInfo := serviceInfoFromK8sServicePort (sinfo )
588
590
npInfo .clusterIP = npip
589
591
npInfo .port = nport
590
- if npip .Equal (podNPIP ) && sinfo .NodeLocalExternal () {
592
+ if npip .Equal (podNPIP ) && sinfo .NodeLocalInternal () {
591
593
// do not program the meta entry, program each node
592
594
// separately
593
595
continue
@@ -598,7 +600,7 @@ func (s *Syncer) apply(state DPSyncerState) error {
598
600
continue
599
601
}
600
602
}
601
- if sinfo .NodeLocalExternal () {
603
+ if sinfo .NodeLocalInternal () {
602
604
if miss := s .expandAndApplyNodePorts (sname , sinfo , eps , nport , s .rt .Lookup ); miss != nil {
603
605
expNPMisses = append (expNPMisses , miss )
604
606
}
@@ -718,7 +720,7 @@ func (s *Syncer) updateService(skey svcKey, sinfo k8sp.ServicePort, id uint32, e
718
720
cnt ++
719
721
}
720
722
721
- if err := s .writeSvc (sinfo , id , cnt , local ); err != nil {
723
+ if err := s .writeSvc (sinfo , id , cnt , local , 0 ); err != nil {
722
724
return 0 , 0 , err
723
725
}
724
726
@@ -800,7 +802,7 @@ func getSvcNATKeyLBSrcRange(svc k8sp.ServicePort) ([]nat.FrontendKey, error) {
800
802
return keys , nil
801
803
}
802
804
803
- func (s * Syncer ) writeLBSrcRangeSvcNATKeys (svc k8sp.ServicePort , svcID uint32 , count , local int ) error {
805
+ func (s * Syncer ) writeLBSrcRangeSvcNATKeys (svc k8sp.ServicePort , svcID uint32 , count , local int , flags uint32 ) error {
804
806
var key nat.FrontendKey
805
807
affinityTimeo := uint32 (0 )
806
808
if svc .SessionAffinityType () == v1 .ServiceAffinityClientIP {
@@ -814,7 +816,7 @@ func (s *Syncer) writeLBSrcRangeSvcNATKeys(svc k8sp.ServicePort, svcID uint32, c
814
816
if err != nil {
815
817
return err
816
818
}
817
- val := nat .NewNATValue (svcID , uint32 (count ), uint32 (local ), affinityTimeo )
819
+ val := nat .NewNATValueWithFlags (svcID , uint32 (count ), uint32 (local ), affinityTimeo , flags )
818
820
for _ , key := range keys {
819
821
if log .GetLevel () >= log .DebugLevel {
820
822
log .Debugf ("bpf map writing %s:%s" , key , val )
@@ -830,7 +832,7 @@ func (s *Syncer) writeLBSrcRangeSvcNATKeys(svc k8sp.ServicePort, svcID uint32, c
830
832
return nil
831
833
}
832
834
833
- func (s * Syncer ) writeSvc (svc k8sp.ServicePort , svcID uint32 , count , local int ) error {
835
+ func (s * Syncer ) writeSvc (svc k8sp.ServicePort , svcID uint32 , count , local int , flags uint32 ) error {
834
836
key , err := getSvcNATKey (svc )
835
837
if err != nil {
836
838
return err
@@ -841,7 +843,7 @@ func (s *Syncer) writeSvc(svc k8sp.ServicePort, svcID uint32, count, local int)
841
843
affinityTimeo = uint32 (svc .StickyMaxAgeSeconds ())
842
844
}
843
845
844
- val := nat .NewNATValue (svcID , uint32 (count ), uint32 (local ), affinityTimeo )
846
+ val := nat .NewNATValueWithFlags (svcID , uint32 (count ), uint32 (local ), affinityTimeo , flags )
845
847
846
848
if log .GetLevel () >= log .DebugLevel {
847
849
log .Debugf ("bpf map writing %s:%s" , key , val )
@@ -1416,6 +1418,7 @@ func K8sSvcWithNodePort(np int) K8sServicePortOption {
1416
1418
func K8sSvcWithLocalOnly () K8sServicePortOption {
1417
1419
return func (s interface {}) {
1418
1420
s .(* serviceInfo ).nodeLocalExternal = true
1421
+ s .(* serviceInfo ).nodeLocalInternal = true
1419
1422
}
1420
1423
}
1421
1424
0 commit comments