-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlast_commit.diff
More file actions
132 lines (126 loc) · 5.44 KB
/
last_commit.diff
File metadata and controls
132 lines (126 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
diff --git a/internal/cmd/cmd_set.go b/internal/cmd/cmd_set.go
index 7a2bbe7..d6d8345 100644
--- a/internal/cmd/cmd_set.go
+++ b/internal/cmd/cmd_set.go
@@ -14,6 +14,8 @@ import (
"github.com/sevenDatabase/SevenDB/internal/shardmanager"
dstore "github.com/sevenDatabase/SevenDB/internal/store"
"github.com/sevenDatabase/SevenDB/internal/types"
+ "github.com/sevenDatabase/SevenDB/internal/raft"
+ "context"
)
const (
@@ -220,8 +222,19 @@ func evalSET(c *Cmd, s *dstore.Store) (*CmdRes, error) {
}
func executeSET(c *Cmd, sm *shardmanager.ShardManager) (*CmdRes, error) {
- if len(c.C.Args) <= 1 {
- return SETResNilRes, errors.ErrWrongArgumentCount("SET")
+ if len(c.C.Args) <= 1 { return SETResNilRes, errors.ErrWrongArgumentCount("SET") }
+ if sm != nil && sm.RaftEnabled() {
+ sh := sm.GetShardForKey(c.C.Args[0])
+ rn := sm.RaftNode(sh.ID)
+ if rn != nil { // raft path
+ if !rn.IsLeader() { return SETResNilRes, errors.ErrGeneral("not leader") }
+ rec, err := raft.BuildReplicationRecord("default", "SET", c.C.Args)
+ if err != nil { return SETResNilRes, err }
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+ if _, _, err := rn.ProposeAndWait(ctx, rec); err != nil { return SETResNilRes, err }
+ return SETResOKRes, nil
+ }
}
shard := sm.GetShardForKey(c.C.Args[0])
return evalSET(c, shard.Thread.Store())
diff --git a/internal/shardmanager/main.go b/internal/shardmanager/main.go
index 2386fec..ea25b8f 100644
--- a/internal/shardmanager/main.go
+++ b/internal/shardmanager/main.go
@@ -14,6 +14,7 @@ import (
"sync"
"syscall"
"time"
+ "strconv"
"github.com/cespare/xxhash/v2"
"github.com/sevenDatabase/SevenDB/config"
@@ -22,6 +23,7 @@ import (
"github.com/sevenDatabase/SevenDB/internal/shard"
"github.com/sevenDatabase/SevenDB/internal/shardthread"
"github.com/sevenDatabase/SevenDB/internal/store"
+ "github.com/sevenDatabase/SevenDB/internal/object"
)
type ShardManager struct {
@@ -81,6 +83,26 @@ func NewShardManager(shardCount int, globalErrorChan chan error) *ShardManager {
slog.Error("failed to start raft for shard", slog.Int("shard", i), slog.Any("error", err))
continue
}
+ // Register replication handler: apply committed SET semantics to underlying store.
+ // Strict=true to avoid silent divergence if application apply fails.
+ rn.SetReplicationHandler(func(pl *raft.ReplicationPayload) error {
+ // Single-shard mapping: choose shard based on first arg (key) consistent with GetShardForKey.
+ if pl == nil || len(pl.Args) == 0 { return fmt.Errorf("empty replication payload args") }
+ key := pl.Args[0]
+ sh := sm.GetShardForKey(key)
+ st := sh.Thread.Store()
+ switch pl.Cmd {
+ case "SET":
+ if len(pl.Args) < 2 { return fmt.Errorf("SET requires key value") }
+ value := pl.Args[1]
+ // Minimal semantics: always overwrite (no options yet). Future: embed options in payload.
+ obj := createObjFromPlain(value, st)
+ st.Put(key, obj)
+ default:
+ return fmt.Errorf("unsupported replicated cmd: %s", pl.Cmd)
+ }
+ return nil
+ }, true)
sm.raftNodes[i] = rn
}
slog.Info("raft enabled for shards", slog.Int("count", shardCount), slog.Uint64("local_id", sm.localRaftID))
@@ -109,6 +131,15 @@ func NewShardManager(shardCount int, globalErrorChan chan error) *ShardManager {
return sm
}
+// createObjFromPlain attempts to parse integer/float then falls back to string.
+// This mirrors logic in cmd layer's CreateObjectFromValue but avoids import cycles.
+func createObjFromPlain(val string, st *store.Store) *object.Obj {
+ // This local helper can't reference cmd package to avoid cycles; replicate minimal logic.
+ if i, err := strconv.ParseInt(val, 10, 64); err == nil { return st.NewObj(i, -1, object.ObjTypeInt) }
+ if f, err := strconv.ParseFloat(val, 64); err == nil { return st.NewObj(f, -1, object.ObjTypeFloat) }
+ return st.NewObj(val, -1, object.ObjTypeString)
+}
+
// Run starts the ShardManager, manages its lifecycle, and listens for errors.
func (manager *ShardManager) Run(ctx context.Context) {
signal.Notify(manager.sigChan, syscall.SIGINT, syscall.SIGTERM)
@@ -259,6 +290,15 @@ func (manager *ShardManager) Shards() []*shard.Shard {
return manager.shards
}
+// RaftEnabled returns true if raft has been initialized for shards.
+func (manager *ShardManager) RaftEnabled() bool { return manager != nil && len(manager.raftNodes) > 0 && manager.raftNodes[0] != nil }
+
+// RaftNode returns the raft node for a shard index (or nil if absent/out of range).
+func (manager *ShardManager) RaftNode(idx int) *raft.ShardRaftNode {
+ if manager == nil || idx < 0 || idx >= len(manager.raftNodes) { return nil }
+ return manager.raftNodes[idx]
+}
+
// BucketLogFor returns a bucket log implementation for the shard index. When raft is enabled
// it returns a RaftBucketLog; otherwise a file-backed log. Errors are logged and a nil may be returned.
func (manager *ShardManager) BucketLogFor(shardIdx int) bucket.BucketLog {
diff --git a/raftdata/a/entries.log b/raftdata/a/entries.log
index c72618c..6c8f77d 100644
--- a/raftdata/a/entries.log
+++ b/raftdata/a/entries.log
@@ -3,3 +3,4 @@ CAAQAhgC
CAAQAxgD
CAAQBBgE
CAAQBRgF
+CAAQBhgG
diff --git a/raftdata/a/hardstate.json b/raftdata/a/hardstate.json
index 0bbfbed..732ec5b 100644
--- a/raftdata/a/hardstate.json
+++ b/raftdata/a/hardstate.json
@@ -1 +1 @@
-{"term":5,"vote":1,"commit":5}
\ No newline at end of file
+{"term":6,"vote":1,"commit":6}
\ No newline at end of file