8000 Improve caching · coder/labeler@9ba5950 · GitHub
[go: up one dir, main page]

Skip to content

Commit 9ba5950

Browse files
committed
Improve caching
1 parent 272ae2c commit 9ba5950

File tree

5 files changed

+94
-13
lines changed

5 files changed

+94
-13
lines changed

bigquery.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import "time"
55
// BqIssue represents a GitHub issue in BigQuery.
66
// The schema is defined here:
77
// https://console.cloud.google.com/bigquery?authuser=1&folder=297399687849&organizationId=867596835188&orgonly=true&project=coder-labeler&supportedpurview=organizationId&ws=!1m5!1m4!4m3!1scoder-labeler!2sghindex!3sissues.
8+
//
9+
// CREATE VECTOR INDEX my_index ON my_dataset.my_table(embedding)
10+
// OPTIONS(index_type = 'IVF', distance_type = 'COSINE',
11+
// ivf_options = '{"num_lists": 2500}')
812
type BqIssue struct {
913
ID int64 `bigquery:"id"`
1014
InstallID int64 `bigquery:"install_id"`

cmd/labeler/main.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/coder/labeler"
1919
"github.com/coder/retry"
2020
"github.com/coder/serpent"
21+
"github.com/go-chi/chi/v5"
2122
"github.com/jussi-kalliokoski/slogdriver"
2223
"github.com/lmittmann/tint"
2324
"github.com/sashabaranov/go-openai"
@@ -147,7 +148,9 @@ func main() {
147148
AppConfig: appConfig,
148149
}
149150

150-
wh.Init()
151+
mux := chi.NewMux()
152+
153+
wh.Init(mux)
151154

152155
bqClient, err := bigquery.NewClient(ctx, root.googleProjectID)
153156
if err != nil {
@@ -175,7 +178,7 @@ func main() {
175178
}
176179
}()
177180

178-
return http.Serve(listener, wh)
181+
return http.Serve(listener, mux)
179182
},
180183
Options: []serpent.Option{
181184
{

cmd/labeler/test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/coder/labeler"
1515
"github.com/coder/labeler/ghapi"
1616
"github.com/coder/serpent"
17+
"github.com/go-chi/chi/v5"
1718
"github.com/google/go-github/v59/github"
1819
)
1920

@@ -142,7 +143,8 @@ func (r *rootCmd) testCmd() *serpent.Command {
142143
Model: r.openAIModel,
143144
AppConfig: appConfig,
144145
}
145-
srv.Init()
146+
mux := chi.NewMux()
147+
srv.Init(mux)
146148

147149
instConfig, err := appConfig.InstallationConfig(installID)
148150
if err != nil {

indexer.go

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/coder/labeler/ghapi"
1414
"github.com/google/go-github/v59/github"
1515
"github.com/sashabaranov/go-openai"
16+
"google.golang.org/api/iterator"
1617
)
1718

1819
type Indexer struct {
@@ -89,6 +90,68 @@ func (s *Indexer) embedIssue(ctx context.Context, issue *github.Issue) ([]float6
8990
return f32to64(resp.Data[0].Embedding), nil
9091
}
9192

93+
func (s *Indexer) issuesTable() *bigquery.Table {
94+
return s.BigQuery.Dataset("ghindex").Table("issues")
95+
}
96+
97+
// getCachedIssues helps avoid duplicate inserts by letting the caller skip over
98+
// issues that have already been indexed.
99+
func (s *Indexer) getCachedIssues(ctx context.Context, installID int64) (map[int64]time.Time, error) {
100+
queryStr := `
101+
WITH RankedIssues AS (
102+
SELECT
103+
id,
104+
updated_at,
105+
inserted_at,
106+
ROW_NUMBER() OVER (PARTITION BY inserted_at, id ORDER BY inserted_at DESC) AS rn
107+
FROM
108+
` + "`coder-labeler.ghindex.issues`" + `
109+
WHERE install_id = @install_id
110+
)
111+
SELECT
112+
id,
113+
updated_at
114+
FROM
115+
RankedIssues
116+
WHERE
117+
rn = 1
118+
ORDER BY
119+
inserted_at DESC;
120+
`
121+
122+
q := s.BigQuery.Query(queryStr)
123+
q.Parameters = []bigquery.QueryParameter{
124+
{
125+
Name: "install_id",
126+
Value: installID,
127+
},
128+
}
129+
130+
job, err := q.Run(ctx)
131+
if err != nil {
132+
return nil, fmt.Errorf("run query: %w", err)
133+
}
134+
iter, err := job.Read(ctx)
135+
if err != nil {
136+
return nil, fmt.Errorf("read query: %w", err)
137+
}
138+
139+
issues := make(map[int64]time.Time)
140+
for {
141+
var i BqIssue
142+
err := iter.Next(&i)
143+
if err == iterator.Done {
144+
break
145+
}
146+
if err != nil {
147+
s.Log.Error("read issue", "error", err)
148+
break
149+
}
150+
issues[i.ID] = i.UpdatedAt
151+
}
152+
return issues, nil
153+
}
154+
92155
// indexInstall indexes all the issues for an installation.
93156
func (s *Indexer) indexInstall(ctx context.Context, install *github.Installation) error {
94157
idstr := fmt.Sprintf("%d", install.GetID())
@@ -116,11 +179,18 @@ func (s *Indexer) indexInstall(ctx context.Context, install *github.Installation
116179
return fmt.Errorf("list repos: %w", err)
117180
}
118181

119-
s.Log.Debug("indexing install", "id", install.GetID(), "repos", len(repos))
182+
log := s.Log.With("install", install.GetID())
183+
log.Debug("indexing install", "repos", len(repos))
120184

121-
table := s.BigQuery.Dataset("ghindex").Table("issues")
185+
table := s.issuesTable()
122186
inserter := table.Inserter()
123187

188+
cachedIssues, err := s.getCachedIssues(ctx, install.GetID())
189+
if err != nil {
190+
return fmt.Errorf("get cached issues: %w", err)
191+
}
192+
log.Debug("got cached issues", "count", len(cachedIssues))
193+
124194
for _, repo := range repos {
125195
// List all issues
126196
issues, err := ghapi.Page(ctx,
@@ -143,6 +213,12 @@ func (s *Indexer) indexInstall(ctx context.Context, install *github.Installation
143213
log.Debug("found issues", "count", len(issues))
144214

145215
for _, issue := range issues {
216+
if uat, ok := cachedIssues[issue.GetID()]; ok {
217+
if issue.UpdatedAt.Time.Equal(uat) {
218+
log.Debug("skipping issue due to cache", "num", issue.GetNumber())
219+
continue
220+
}
221+
}
146222
emb, err := s< 10000 /span>.embedIssue(ctx, issue)
147223
if err != nil {
148224
return fmt.Errorf("embed issue %v: %w", issue.ID, err)
@@ -168,7 +244,7 @@ func (s *Indexer) indexInstall(ctx context.Context, install *github.Installation
168244
log.Debug("indexed issue", "num", issue.GetNumber())
169245
}
170246
}
171-
247+
log.Debug("finished indexing")
172248
return nil
173249
}
174250

@@ -187,7 +263,7 @@ func (s *Indexer) runIndex(ctx context.Context) error {
187263

188264
// Run starts the indexer and blocks until it's done.
189265
func (s *Indexer) Run(ctx context.Context) error {
190-
ticker := time.NewTicker(time.Minute)
266+
ticker := time.NewTicker(time.Second * 10)
191267
defer ticker.Stop()
192268
for {
193269
err := s.runIndex(ctx)

webhook.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ type Webhook struct {
4444
recentIssuesCache *tlru.Cache[repoAddr, []*github.Issue]
4545
}
4646

47-
func (s *Webhook) Init() {
48-
s.router = chi.NewRouter()
47+
func (s *Webhook) Init(r *chi.Mux) {
48+
s.router = r
4949
s.router.Mount("/infer", httpjson.Handler(s.infer))
5050
s.router.Mount("/webhook", httpjson.Handler(s.webhook))
5151

@@ -359,7 +359,3 @@ func (s *Webhook) webhook(w http.ResponseWriter, r *http.Request) *httpjson.Resp
359359
Body: httpjson.M{"message": "labels set", "labels": resp.SetLabels},
360360
}
361361
}
362-
363-
func (s *Webhook) ServeHTTP(w http.ResponseWriter, r *http.Request) {
364-
s.router.ServeHTTP(w, r)
365-
}

0 commit comments

Comments
 (0)
0