8000 feat(core): Add Supabase Queues support by onurtemizkan · Pull Request #15921 · getsentry/sentry-javascript · GitHub
[go: up one dir, main page]

Skip to content

feat(core): Add Supabase Queues support #15921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from

Conversation

onurtemizkan
Copy link
Collaborator
@onurtemizkan onurtemizkan commented Mar 28, 2025

Resolves: #14611

Summary:

Sample Queue Event: Link

  • Adds queue spans and breadcrumbs for Supabase Queue operations.
    • Usage with supabaseClient.rpc('<queue_op>', ...)
    • Usage with supabaseClient.schema(...).rpc('queue_op', ...)
  • Mapped operations to their respective op names documented here
    • producer ops: send, send_batch
    • consumer op: pop
  • Added browser integration tests, and e2e tests.

Copy link
Contributor
github-actions bot commented Mar 28, 2025

size-limit report 📦

Path Size % Change Change
@sentry/browser 23.99 kB - -
@sentry/browser - with treeshaking flags 23.76 kB - -
@sentry/browser (incl. Tracing) 39.59 kB - -
@sentry/browser (incl. Tracing, Replay) 77.69 kB - -
@sentry/browser (incl. Tracing, Replay) - with treeshaking flags 70.78 kB - -
@sentry/browser (incl. Tracing, Replay with Canvas) 82.45 kB - -
@sentry/browser (incl. Tracing, Replay, Feedback) 94.57 kB - -
@sentry/browser (incl. Feedback) 40.75 kB - -
@sentry/browser (incl. sendFeedback) 28.7 kB - -
@sentry/browser (incl. FeedbackAsync) 33.59 kB - -
@sentry/react 25.76 kB - -
@sentry/react (incl. Tracing) 41.58 kB - -
@sentry/vue 28.37 kB - -
@sentry/vue (incl. Tracing) 41.4 kB - -
@sentry/svelte 24.01 kB - -
CDN Bundle 25.5 kB - -
CDN Bundle (incl. Tracing) 39.6 kB - -
CDN Bundle (incl. Tracing, Replay) 75.5 kB - -
CDN Bundle (incl. Tracing, Replay, Feedback) 80.96 kB - -
CDN Bundle - uncompressed 74.5 kB - -
CDN Bundle (incl. Tracing) - uncompressed 117.63 kB - -
CDN Bundle (incl. Tracing, Replay) - uncompressed 231.68 kB - -
CDN Bundle (incl. Tracing, Replay, Feedback) - uncompressed 244.5 kB - -
@sentry/nextjs (client) 43.22 kB - -
@sentry/sveltekit (client) 40.04 kB - -
@sentry/node 162.02 kB -0.01% -1 B 🔽
@sentry/node - without tracing 98.64 kB -0.01% -1 B 🔽
@sentry/aws-serverless 124.4 kB -0.01% -1 B 🔽

View base workflow run

@onurtemizkan onurtemizkan force-pushed the onur/supabase-queues branch from bbadd60 to e7b3370 Compare March 31, 2025 09:44
@onurtemizkan onurtemizkan force-pushed the onur/supabase-integration branch from 1bc2897 to d387514 Compare March 31, 2025 10:26
@onurtemizkan onurtemizkan force-pushed the onur/supabase-queues branch from e7b3370 to 56a2d84 Compare March 31, 2025 14:12
@onurtemizkan onurtemizkan linked an issue Mar 31, 2025 that may be closed by this pull request
@onurtemizkan onurtemizkan force-pushed the onur/supabase-queues branch from 56a2d84 to 13d60e0 Compare March 31, 2025 14:25
@onurtemizkan onurtemizkan force-pushed the onur/supabase-integration branch 4 times, most recently from 423397d to 556703c Compare April 14, 2025 15:26
Base automatically changed from onur/supabase-integration to develop April 17, 2025 13:35
@onurtemizkan onurtemizkan force-pushed the onur/supabase-queues branch from 13d60e0 to 74869e4 Compare April 22, 2025 14:16
@onurtemizkan onurtemizkan marked this pull request as ready for review April 24, 2025 09:32
@onurtemizkan onurtemizkan requested review from lforst and smeubank April 24, 2025 09:33
@onurtemizkan onurtemizkan force-pushed the onur/supabase-queues branch from e63915a to 719c8b6 Compare April 24, 2025 14:11
lforst
},
},
async span => {
return (Reflect.apply(target, thisArg, argumentsList) as Promise<unknown>).then((res: unknown) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably also end the span when it throws/rejects? We can also set the status of the span then.

name: 'supabase.db.rpc',
attributes: {
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase',
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: op,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add the messaging.system attribute to be 'supabase' as described in https://develop.sentry.dev/sdk/telemetry/traces/modules/queues/

Comment on lines 238 to 239
const isProducerSpan = argumentsList[0] === 'enqueue';
const isConsumerSpan = argumentsList[0] === 'dequeue';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this recently changed, but here they show send and pop as rpc args: https://supabase.com/docs/guides/queues/quickstart#enqueueing-and-dequeueing-messages 🤔

@onurtemizkan onurtemizkan force-pushed the onur/supabase-queues branch 4 times, most recently from fb8eeb1 to a8da234 Compare May 31, 2025 20:58
@onurtemizkan onurtemizkan force-pushed the onur/supabase-queues branch 2 times, most recently from ff2f804 to 5ce5ee9 Compare June 23, 2025 15:58
@onurtemizkan onurtemizkan requested review from Lms24 and AbhiPrasad June 23, 2025 23:39
@AbhiPrasad
Copy link
Member

@sentry review

Copy link

On it! We are reviewing the PR and will provide feedback shortly.

Copy link

PR Description

This pull request introduces instrumentation for Supabase queue operations using pgmq, enabling Sentry to capture spans and breadcrumbs for queue publishing and processing. This provides visibility into asynchronous task execution within Supabase applications.

Click to see more

Key Technical Changes

The key technical changes include: 1) Instrumenting the rpc method of the Supabase client to detect queue operations (send, send_batch, pop). 2) Creating spans for queue publish and process operations, capturing relevant metadata like queue name and message ID. 3) Adding breadcrumbs for queue operations to provide context in Sentry events. 4) Modifying queue messages to inject Sentry trace context for distributed tracing. 5) Adding e2e tests to verify the instrumentation in a Next.js application.

Architecture Decisions

The primary architectural decision involves using Proxy objects to intercept calls to the Supabase client's rpc method and the schema(...).rpc(...) method. This allows for non-intrusive instrumentation without modifying the Supabase client's core code. The trace context is injected directly into the message body to propagate tracing information across queue operations.

Dependencies and Interactions

This integration depends on the @supabase/supabase-js library and interacts with Sentry's tracing and breadcrumb APIs. It also relies on the pgmq extension being enabled in the Supabase database. The integration injects Sentry trace context into queue messages, which requires consumers to be instrumented to extract and continue the trace.

Risk Considerations

Potential risks include: 1) Performance overhead due to the instrumentation, although proxies are generally performant. 2) Incorrectly identifying queue operations, leading to spurious spans. 3) Failure to propagate trace context if consumers are not properly instrumented. 4) Security implications of modifying message bodies, although the injected data is limited to Sentry trace context. 5) The modification of the arguments list in place could lead to unexpected side effects.

Notable Implementation Details

Notable implementation details include: 1) The use of continueTrace to link consumer spans to producer spans. 2) The handling of both rpc and schema(...).rpc(...) calls. 3) The injection of Sentry trace context into the message body. 4) The vendoring of SQL code from the Supabase repository to enable queue access locally.

Comment on lines 230 to 266

function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void {
if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) {
return;
}

(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy(
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema,
{
apply(target, thisArg, argumentsList) {
const supabaseInstance = Reflect.apply(target, thisArg, argumentsList);

(supabaseInstance as unknown as SupabaseClientConstructor).rpc = new Proxy(
(supabaseInstance as unknown as SupabaseClientInstance).rpc,
{
apply(target, thisArg, argumentsList) {
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch';
const isConsumerSpan = argumentsList[0] === 'pop';

if (!isProducerSpan && !isConsumerSpan) {
return Reflect.apply(target, thisArg, argumentsList);
}

if (isProducerSpan) {
return instrumentRpcProducer(target, thisArg, argumentsList);
} else if (isConsumerSpan) {
return instrumentRpcConsumer(target, thisArg, argumentsList);
}

// If the operation is not a queue operation, return the original function
return Reflect.apply(target, thisArg, argumentsList);
},
},
);

return supabaseInstance;
},
},
);

markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema);
}

function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function instrumentRpcReturnedFromSchemaCall has deeply nested proxy wrapping which makes it hard to follow and maintain. Consider extracting the inner proxy logic into a separate function for better readability.

Suggested change
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void {
if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) {
return;
}
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy(
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema,
{
apply(target, thisArg, argumentsList) {
const supabaseInstance = Reflect.apply(target, thisArg, argumentsList);
(supabaseInstance as unknown as SupabaseClientConstructor).rpc = new Proxy(
(supabaseInstance as unknown as SupabaseClientInstance).rpc,
{
apply(target, thisArg, argumentsList) {
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch';
const isConsumerSpan = argumentsList[0] === 'pop';
if (!isProducerSpan && !isConsumerSpan) {
return Reflect.apply(target, thisArg, argumentsList);
}
if (isProducerSpan) {
return instrumentRpcProducer(target, thisArg, argumentsList);
} else if (isConsumerSpan) {
return instrumentRpcConsumer(target, thisArg, argumentsList);
}
// If the operation is not a queue operation, return the original function
return Reflect.apply(target, thisArg, argumentsList);
},
},
);
return supabaseInstance;
},
},
);
markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema);
}
function extractTraceAndBaggageFromMessage(message: { _sentry?: { sentry_trace?: string; baggage?: string } }): {
function instrumentRpcReturnedFromSchemaCall(SupabaseClient: unknown): void {
if (isInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema)) {
return;
}
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema = new Proxy(
(SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema,
{
apply(target, thisArg, argumentsList) {
const supabaseInstance = Reflect.apply(target, thisArg, argumentsList);
instrumentRpcMethod(supabaseInstance as unknown as SupabaseClientConstructor);
return supabaseInstance;
},
},
);
markAsInstrumented((SupabaseClient as unknown as SupabaseClientConstructor).prototype.schema);
}
function instrumentRpcMethod(supabaseInstance: SupabaseClientConstructor): void {
supabaseInstance.rpc = new Proxy(
(supabaseInstance as unknown as SupabaseClientInstance).rpc,
{
apply(target, thisArg, argumentsList) {
const isProducerSpan = argumentsList[0] === 'send' || argumentsList[0] === 'send_batch';
const isConsumerSpan = argumentsList[0] === 'pop';
if (!isProducerSpan && !isConsumerSpan) {
return Reflect.apply(target, thisArg, argumentsList);
}
if (isProducerSpan) {
return instrumentRpcProducer(target, thisArg, argumentsList);
} else if (isConsumerSpan) {
return instrumentRpcConsumer(target, thisArg, argumentsList);
}
return Reflect.apply(target, thisArg, argumentsList);
},
},
);
}

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

@onurtemizkan onurtemizkan force-pushed the onur/supabase-queues branch from 4f19854 to 1c518a2 Compare July 3, 2025 14:40
Copy link
@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Function Mutates User Objects Unintentionally

The instrumentRpcProducer function mutates user-provided message objects by directly adding _sentry tracing metadata and reassigning the original argumentsList[1]. This can cause unexpected side effects if the caller reuses or inspects these objects, or if they are frozen/readonly.

packages/core/src/integrations/supabase.ts#L424-L440

if (sentryArgumentsQueueParams?.message) {
sentryArgumentsQueueParams.message._sentry = {
sentry_trace: sentryTrace,
baggage: sentryBaggage,
};
} else if (sentryArgumentsQueueParams?.messages) {
sentryArgumentsQueueParams.messages = sentryArgumentsQueueParams.messages.map(message => {
message._sentry = {
sentry_trace: sentryTrace,
baggage: sentryBaggage,
};
return message;
});
}
argumentsList[1] = sentryArgumentsQueueParams;

Fix in Cursor


Bug: RPC Consumer Bugs: Error Handling, Response Mutation, Latency Calculation

The instrumentRpcConsumer function has three main issues:

  1. Incomplete Error Handling: It lacks a .catch() handler for the initial RPC call, preventing spans from being ended and errors from being captured on rejection. This is inconsistent with the instrumentRpcProducer function.
  2. Response Mutation: It directly modifies the Supabase response object by deleting the _sentry property, which can cause unexpected side effects if the response is read-only or used elsewhere.
  3. Invalid Latency Calculation: The latency calculation can result in NaN if the enqueued_at timestamp is unparseable, leading to invalid span attributes.

packages/core/src/integrations/supabase.ts#L278-L387

const instrumentRpcConsumer = (target: any, thisArg: any, argumentsList: any[]): Promise<unknown> => {
const [operationName, queueParams] = argumentsList as [
'pop',
{
queue_name?: string;
},
];
const isConsumerSpan = operationName === 'pop';
const queueName = queueParams?.queue_name;
if (!isConsumerSpan) {
return Reflect.apply(target, thisArg, argumentsList); // Not a consumer operation
}
return (Reflect.apply(target, thisArg, argumentsList) as Promise<SupabaseResponse>).then((res: SupabaseResponse) => {
const latency = res.data?.[0]?.enqueued_at ? Date.now() - Date.parse(res.data?.[0]?.enqueued_at) : undefined;
const { sentryTrace, baggage } = extractTraceAndBaggageFromMessage(res.data?.[0]?.message || {});
// Remove Sentry metadata from the returned message
delete res.data?.[0]?.message?._sentry;
return continueTrace(
{
sentryTrace,
baggage,
},
() => {
return startSpan(
{
name: 'supabase.db.rpc',
attributes: {
[SEMANTIC_ATTRIBUTE_SENTRY_OP]: 'queue.process',
[SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: 'auto.db.supabase',
'messaging.system': 'supabase',
},
},
span => {
const messageId =
res?.data?.map(item => (typeof item === 'number' ? item : item.msg_id)).join(',') || undefined;
if (messageId) {
span.setAttribute('messaging.message.id', messageId);
}
if (queueName) {
span.setAttribute('messaging.destination.name', queueName);
}
if (latency) {
span.setAttribute('messaging.message.receive.latency', latency);
}
const breadcrumb: SupabaseBreadcrumb = {
type: 'supabase',
category: `db.rpc.${argumentsList[0]}`,
message: `rpc(${argumentsList[0]})`,
};
const data: Record<string, unknown> = {};
if (messageId) {
data['messaging.message.id'] = messageId;
}
if (queueName) {
data['messaging.destination.name'] = queueName;
}
if (Object.keys(data).length) {
breadcrumb.data = data;
}
addBreadcrumb(breadcrumb);
if (res.error) {
const err = new Error(res.error.message) as SupabaseError;
if (res.error.code) {
err.code = res.error.code;
}
if (res.error.details) {
err.details = res.error.details;
}
captureException(err, {
contexts: {
supabase: {
queueName,
messageId,
},
},
});
span.setStatus({ code: SPAN_STATUS_ERROR });
} else {
span.setStatus({ code: SPAN_STATUS_OK });
}
span.end();
return res;
},
);
},
);
});

Fix in Cursor


Was this report helpful? Give feedback by reacting with 👍 or 👎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for Supabase Queues
3 participants
0