-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(core): Add Supabase Queues support #15921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
size-limit report 📦
|
bbadd60
to
e7b3370
Compare
1bc2897
to
d387514
Compare
e7b3370
to
56a2d84
Compare
56a2d84
to
13d60e0
Compare
423397d
to
556703c
Compare
13d60e0
to
74869e4
Compare
e63915a
to
719c8b6
Compare
}, | ||
}, | ||
async span => { | ||
return (Reflect.apply(target, thisArg, argumentsList) as Promise<unknown>).then((res: unknown) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably also end the span when it throws/rejects? We can also set the status of the span then.
name: 'supabase.db.rpc', | ||
attributes: { | ||
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', | ||
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: op, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can add the messaging.system
attribute to be 'supabase'
as described in https://develop.sentry.dev/sdk/telemetry/traces/modules/queues/
const isProducerSpan = argumentsList[0] === 'enqueue'; | ||
const isConsumerSpan = argumentsList[0] === 'dequeue'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if this recently changed, but here they show send
and pop
as rpc args: https://supabase.com/docs/guides/queues/quickstart#enqueueing-and-dequeueing-messages 🤔
fb8eeb1
to
a8da234
Compare
ff2f804
to
5ce5ee9
Compare
@sentry review |
On it! We are reviewing the PR and will provide feedback shortly. |
PR DescriptionThis pull request introduces instrumentation for Supabase queue operations using pgmq, enabling Sentry to capture spans and breadcrumbs for queue publishing and processing. This provides visibility into asynchronous task execution within Supabase applications. Click to see moreKey Technical ChangesThe key technical changes include: 1) Instrumenting the Architecture DecisionsThe primary architectural decision involves using Proxy objects to intercept calls to the Supabase client's Dependencies and InteractionsThis integration depends on the Risk ConsiderationsPotential risks include: 1) Performance overhead due to the instrumentation, although proxies are generally performant. 2) Incorrectly identifying queue operations, leading to spurious spans. 3) Failure to propagate trace context if consumers are not properly instrumented. 4) Security implications of modifying message bodies, although the injected data is limited to Sentry trace context. 5) The modification of the arguments list in place could lead to unexpected side effects. Notable Implementation DetailsNotable implementation details include: 1) The use of |
|
||
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { | ||
if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) { | ||
return; | ||
} | ||
|
||
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( | ||
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, | ||
{ | ||
apply(target, thisArg, argumentsList) { | ||
const supabaseInstance = Reflect.apply(target, thisArg, argumentsList); | ||
|
||
(supabaseInstance as unknown as SupabaseClientConstructor).rpc = new Proxy( | ||
(supabaseInstance as unknown as SupabaseClientInstance).rpc, | ||
{ | ||
apply(target, thisArg, argumentsList) { | ||
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; | ||
const isConsumerSpan = argumentsList[0] === 'pop'; | ||
|
||
if (!isProducerSpan && !isConsumerSpan) { | ||
return Reflect.apply(target, thisArg, argumentsList); | ||
} | ||
|
||
if (isProducerSpan) { | ||
return instrumentRpcProducer(target, thisArg, argumentsList); | ||
} else if (isConsumerSpan) { | ||
return instrumentRpcConsumer(target, thisArg, argumentsList); | ||
} | ||
|
||
// If the operation is not a queue operation, return the original function | ||
return Reflect.apply(target, thisArg, argumentsList); | ||
}, | ||
}, | ||
); | ||
|
||
return supabaseInstance; | ||
}, | ||
}, | ||
); | ||
|
||
markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema); | ||
} | ||
|
||
function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function instrumentRpcReturnedFromSchemaCall
has deeply nested proxy wrapping which makes it hard to follow and maintain. Consider extracting the inner proxy logic into a separate function for better readability.
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { | |
if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) { | |
return; | |
} | |
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( | |
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, | |
{ | |
apply(target, thisArg, argumentsList) { | |
const supabaseInstance = Reflect.apply(target, thisArg, argumentsList); | |
(supabaseInstance as unknown as SupabaseClientConstructor).rpc = new Proxy( | |
(supabaseInstance as unknown as SupabaseClientInstance).rpc, | |
{ | |
apply(target, thisArg, argumentsList) { | |
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; | |
const isConsumerSpan = argumentsList[0] === 'pop'; | |
if (!isProducerSpan && !isConsumerSpan) { | |
return Reflect.apply(target, thisArg, argumentsList); | |
} | |
if (isProducerSpan) { | |
return instrumentRpcProducer(target, thisArg, argumentsList); | |
} else if (isConsumerSpan) { | |
return instrumentRpcConsumer(target, thisArg, argumentsList); | |
} | |
// If the operation is not a queue operation, return the original function | |
return Reflect.apply(target, thisArg, argumentsList); | |
}, | |
}, | |
); | |
return supabaseInstance; | |
}, | |
}, | |
); | |
markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema); | |
} | |
function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): { | |
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void { | |
if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) { | |
return; | |
} | |
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy( | |
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema, | |
{ | |
apply(target, thisArg, argumentsList) { | |
const supabaseInstance = Reflect.apply(target, thisArg, argumentsList); | |
instrumentRpcMethod(supabaseInstance as unknown as SupabaseClientConstructor); | |
return supabaseInstance; | |
}, | |
}, | |
); | |
markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema); | |
} | |
function instrumentRpcMethod(supabaseInstance: SupabaseClientConstructor): void { | |
supabaseInstance.rpc = new Proxy( | |
(supabaseInstance as unknown as SupabaseClientInstance).rpc, | |
{ | |
apply(target, thisArg, argumentsList) { | |
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch'; | |
const isConsumerSpan = argumentsList[0] === 'pop'; | |
if (!isProducerSpan && !isConsumerSpan) { | |
return Reflect.apply(target, thisArg, argumentsList); | |
} | |
if (isProducerSpan) { | |
return instrumentRpcProducer(target, thisArg, argumentsList); | |
} else if (isConsumerSpan) { | |
return instrumentRpcConsumer(target, thisArg, argumentsList); | |
} | |
return Reflect.apply(target, thisArg, argumentsList); | |
}, | |
}, | |
); | |
} |
4f19854
to
1c518a2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Function Mutates User Objects Unintentionally
The instrumentRpcProducer
function mutates user-provided message objects by directly adding _sentry
tracing metadata and reassigning the original argumentsList[1]
. This can cause unexpected side effects if the caller reuses or inspects these objects, or if they are frozen/readonly.
packages/core/src/integrations/supabase.ts#L424-L440
sentry-javascript/packages/core/src/integrations/supabase.ts
Lines 424 to 440 in 1c518a2
if (sentryArgumentsQueueParams?.message) { | |
sentryArgumentsQueueParams.message._sentry = { | |
sentry_trace: sentryTrace, | |
baggage: sentryBaggage, | |
}; | |
} else if (sentryArgumentsQueueParams?.messages) { | |
sentryArgumentsQueueParams.messages = sentryArgumentsQueueParams.messages.map(message => { | |
message._sentry = { | |
sentry_trace: sentryTrace, | |
baggage: sentryBaggage, | |
}; | |
return message; | |
}); | |
} | |
argumentsList[1] = sentryArgumentsQueueParams; |
Bug: RPC Consumer Bugs: Error Handling, Response Mutation, Latency Calculation
The instrumentRpcConsumer
function has three main issues:
- Incomplete Error Handling: It lacks a
.catch()
handler for the initial RPC call, preventing spans from being ended and errors from being captured on rejection. This is inconsistent with theinstrumentRpcProducer
function. - Response Mutation: It directly modifies the Supabase response object by deleting the
_sentry
property, which can cause unexpected side effects if the response is read-only or used elsewhere. - Invalid Latency Calculation: The latency calculation can result in
NaN
if theenqueued_at
timestamp is unparseable, leading to invalid span attributes.
packages/core/src/integrations/supabase.ts#L278-L387
sentry-javascript/packages/core/src/integrations/supabase.ts
Lines 278 to 387 in 1c518a2
const instrumentRpcConsumer = (target: any, thisArg: any, argumentsList: any[]): Promise<unknown> => { | |
const [operationName, queueParams] = argumentsList as [ | |
'pop', | |
{ | |
queue_name?: string; | |
}, | |
]; | |
const isConsumerSpan = operationName === 'pop'; | |
const queueName = queueParams?.queue_name; | |
if (!isConsumerSpan) { | |
return Reflect.apply(target, thisArg, argumentsList); // Not a consumer operation | |
} | |
return (Reflect.apply(target, thisArg, argumentsList) as Promise<SupabaseResponse>).then((res: SupabaseResponse) => { | |
const latency = res.data?.[0]?.enqueued_at ? Date.now() - Date.parse(res.data?.[0]?.enqueued_at) : undefined; | |
const { sentryTrace, baggage } = extractTraceAndBaggageFromMessage(res.data?.[0]?.message || {}); | |
// Remove Sentry metadata from the returned message | |
delete res.data?.[0]?.message?._sentry; | |
return continueTrace( | |
{ | |
sentryTrace, | |
baggage, | |
}, | |
() => { | |
return startSpan( | |
{ | |
name: 'supabase.db.rpc', | |
attributes: { | |
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.process', | |
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase', | |
'messaging.system': 'supabase', | |
}, | |
}, | |
span => { | |
const messageId = | |
res?.data?.map(item => (typeof item === 'number' ? item : item.msg_id)).join(',') || undefined; | |
if (messageId) { | |
span.setAttribute('messaging.message.id', messageId); | |
} | |
if (queueName) { | |
span.setAttribute('messaging.destination.name', queueName); | |
} | |
if (latency) { | |
span.setAttribute('messaging.message.receive.latency', latency); | |
} | |
const breadcrumb: SupabaseBreadcrumb = { | |
type: 'supabase', | |
category: `db.rpc.${argumentsList[0]}`, | |
message: `rpc(${argumentsList[0]})`, | |
}; | |
const data: Record<string, unknown> = {}; | |
if (messageId) { | |
data['messaging.message.id'] = messageId; | |
} | |
if (queueName) { | |
data['messaging.destination.name'] = queueName; | |
} | |
if (Object.keys(data).length) { | |
breadcrumb.data = data; | |
} | |
addBreadcrumb(breadcrumb); | |
if (res.error) { | |
const err = new Error(res.error.message) as SupabaseError; | |
if (res.error.code) { | |
err.code = res.error.code; | |
} | |
if (res.error.details) { | |
err.details = res.error.details; | |
} | |
captureException(err, { | |
contexts: { | |
supabase: { | |
queueName, | |
messageId, | |
}, | |
}, | |
}); | |
span.setStatus({ code: SPAN_STATUS_ERROR }); | |
} else { | |
span.setStatus({ code: SPAN_STATUS_OK }); | |
} | |
span.end(); | |
return res; | |
}, | |
); | |
}, | |
); | |
}); |
Was this report helpful? Give feedback by reacting with 👍 or 👎
Resolves: #14611
Summary:
Sample Queue Event: Link
supabaseClient.rpc('<queue_op>', ...)
supabaseClient.schema(...).rpc('queue_op', ...)
producer
ops:send
,send_batch
consumer
op:pop