8000 [RFC] Non-deterministic operator · Issue #2619 · feldera/feldera · GitHub
[go: up one dir, main page]

Skip to content

[RFC] Non-deterministic operator #2619

@ryzhyk

Description

@ryzhyk

Summary

DBSP assumes that all computations are deterministic. A non-dererministic UDF will cause wrong outputs. However, such UDFs could be useful when interacting with external services, such as ML inference or vector search (or anything else stateful and/or non-deterministic).

This RFC proposes a new stateful operator that allows using non-deterministic UDFs safely, basically by caching their outputs.

Design

The new nondet_map operator takes a Z-set and a closure f and outputs an indexed Z-set, where keys are tuples from the original Z-set and values are computed by applying the closure to each tuple. It maintains an integral of its output. For each input tuple (k, w), it first looks up k in i. If a match v is found, it outputs a tuple (k, v, w). Otherwise, it outputs (k, f(k), w).

         ┌─────────────┐                        
   s     │             │                        
────────►│ nondet_map  ├────────────┬──────────►
         │             │            │           
         └─────────────┘            │           
                ▲                   │           
                │i                  │           
                │                   │           
             ┌──┴──┐                │           
             │     │                │           
             │  I  │◄───────────────┘           
             │     │                            
             └─────┘                            

Once a key is added to the integral, it remains there until the total weight of the key drops to 0, i.e., all copies of the key are removed from the collection.

Alternative design with refresh

Alternatively, instead of outputting the cached value again, the operator could invoke f every time, and if the new output is different from the stored on, it retracts the old tuple (k, v, old_weight) ans inserts the new tuple (k, f(k), old_weight + delta_weight).

Yet another variation would only re-evaluate f if the current value is sufficiently stale, e.g., hasn't been updated for a day.

GC

If the input stream has lateness, we can apply integrate_trace_retain_keys to the integral to GC old keys.

Discussion

This design doesn't guarantee determinism in the strongest possible sense, but it does make non-deterministic functions safe to use in DBSP. In particular, if a value k is deleted and then added again to the input collection, the new value of f(k) can be different from the first value (likewise, in the version of the operator with refresh, the output can change even while the value is still present in the collection).

Metadata

Metadata

Assignees

No one assigned

    Labels

    DBSP coreRelated to the core DBSP librarySQL compilerRelated to the SQL compiler

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      0