@@ -13,6 +13,7 @@ import (
13
13
"github.com/coder/labeler/ghapi"
14
14
"github.com/google/go-github/v59/github"
15
15
"github.com/sashabaranov/go-openai"
16
+ "google.golang.org/api/iterator"
16
17
)
17
18
18
19
type Indexer struct {
@@ -89,6 +90,68 @@ func (s *Indexer) embedIssue(ctx context.Context, issue *github.Issue) ([]float6
89
90
return f32to64 (resp .Data [0 ].Embedding ), nil
90
91
}
91
92
93
+ func (s * Indexer ) issuesTable () * bigquery.Table {
94
+ return s .BigQuery .Dataset ("ghindex" ).Table ("issues" )
95
+ }
96
+
97
+ // getCachedIssues helps avoid duplicate inserts by letting the caller skip over
98
+ // issues that have already been indexed.
99
+ func (s * Indexer ) getCachedIssues (ctx context.Context , installID int64 ) (map [int64 ]time.Time , error ) {
100
+ queryStr := `
101
+ WITH RankedIssues AS (
102
+ SELECT
103
+ id,
104
+ updated_at,
105
+ inserted_at,
106
+ ROW_NUMBER() OVER (PARTITION BY inserted_at, id ORDER BY inserted_at DESC) AS rn
107
+ FROM
108
+ ` + "`coder-labeler.ghindex.issues`" + `
109
+ WHERE install_id = @install_id
110
+ )
111
+ SELECT
112
+ id,
113
+ updated_at
114
+ FROM
115
+ RankedIssues
116
+ WHERE
117
+ rn = 1
118
+ ORDER BY
119
+ inserted_at DESC;
120
+ `
121
+
122
+ q := s .BigQuery .Query (queryStr )
123
+ q .Parameters = []bigquery.QueryParameter {
124
+ {
125
+ Name : "install_id" ,
126
+ Value : installID ,
127
+ },
128
+ }
129
+
130
+ job , err := q .Run (ctx )
131
+ if err != nil {
132
+ return nil , fmt .Errorf ("run query: %w" , err )
133
+ }
134
+ iter , err := job .Read (ctx )
135
+ if err != nil {
136
+ return nil , fmt .Errorf ("read query: %w" , err )
137
+ }
138
+
139
+ issues := make (map [int64 ]time.Time )
140
+ for {
141
+ var i BqIssue
142
+ err := iter .Next (& i )
143
+ if err == iterator .Done {
144
+ break
145
+ }
146
+ if err != nil {
147
+ s .Log .Error ("read issue" , "error" , err )
148
+ break
149
+ }
150
+ issues [i .ID ] = i .UpdatedAt
151
+ }
152
+ return issues , nil
153
+ }
154
+
92
155
// indexInstall indexes all the issues for an installation.
93
156
func (s * Indexer ) indexInstall (ctx context.Context , install * github.Installation ) error {
94
157
idstr := fmt .Sprintf ("%d" , install .GetID ())
@@ -116,11 +179,18 @@ func (s *Indexer) indexInstall(ctx context.Context, install *github.Installation
116
179
return fmt .Errorf ("list repos: %w" , err )
117
180
}
118
181
119
- s .Log .Debug ("indexing install" , "id" , install .GetID (), "repos" , len (repos ))
182
+ log := s .Log .With ("install" , install .GetID ())
183
+ log .Debug ("indexing install" , "repos" , len (repos ))
120
184
121
- table := s .BigQuery . Dataset ( "ghindex" ). Table ( "issues" )
185
+ table := s .issuesTable ( )
122
186
inserter := table .Inserter ()
123
187
188
+ cachedIssues , err := s .getCachedIssues (ctx , install .GetID ())
189
+ if err != nil {
190
+ return fmt .Errorf ("get cached issues: %w" , err )
191
+ }
192
+ log .Debug ("got cached issues" , "count" , len (cachedIssues ))
193
+
124
194
for _ , repo := range repos {
125
195
// List all issues
126
196
issues , err := ghapi .Page (ctx ,
@@ -143,6 +213,12 @@ func (s *Indexer) indexInstall(ctx context.Context, install *github.Installation
143
213
log .Debug ("found issues" , "count" , len (issues ))
144
214
145
215
for _ , issue := range issues {
216
+ if uat , ok := cachedIssues [issue .GetID ()]; ok {
217
+ if issue .UpdatedAt .Time .Equal (uat ) {
218
+ log .Debug ("skipping issue due to cache" , "num" , issue .GetNumber ())
219
+ continue
220
+ }
221
+ }
146
222
emb , err := s<
10000
/span>.embedIssue (ctx , issue )
147
223
if err != nil {
148
224
return fmt .Errorf ("embed issue %v: %w" , issue .ID , err )
@@ -168,7 +244,7 @@ func (s *Indexer) indexInstall(ctx context.Context, install *github.Installation
168
244
log .Debug ("indexed issue" , "num" , issue .GetNumber ())
169
245
}
170
246
}
171
-
247
+ log . Debug ( "finished indexing" )
172
248
return nil
173
249
}
174
250
@@ -187,7 +263,7 @@ func (s *Indexer) runIndex(ctx context.Context) error {
187
263
188
264
// Run starts the indexer and blocks until it's done.
189
265
func (s * Indexer ) Run (ctx context.Context ) error {
190
- ticker := time .NewTicker (time .Minute )
266
+ ticker := time .NewTicker (time .Second * 10 )
191
267
defer ticker .Stop ()
192
268
for {
193
269
err := s .runIndex (ctx )
0 commit comments