-
Notifications
You must be signed in to change notification settings - Fork 105
Description
Summary
DBSP assumes that all computations are deterministic. A non-dererministic UDF will cause wrong outputs. However, such UDFs could be useful when interacting with external services, such as ML inference or vector search (or anything else stateful and/or non-deterministic).
This RFC proposes a new stateful operator that allows using non-deterministic UDFs safely, basically by caching their outputs.
Design
The new nondet_map operator takes a Z-set and a closure f and outputs an indexed Z-set, where keys are tuples from the original Z-set and values are computed by applying the closure to each tuple. It maintains an integral of its output. For each input tuple (k, w), it first looks up k in i. If a match v is found, it outputs a tuple (k, v, w). Otherwise, it outputs (k, f(k), w).
┌─────────────┐
s │ │
────────►│ nondet_map ├────────────┬──────────►
│ │ │
└─────────────┘ │
▲ │
│i │
│ │
┌──┴──┐ │
│ │ │
│ I │◄───────────────┘
│ │
└─────┘
Once a key is added to the integral, it remains there until the total weight of the key drops to 0, i.e., all copies of the key are removed from the collection.
Alternative design with refresh
Alternatively, instead of outputting the cached value again, the operator could invoke f every time, and if the new output is different from the stored on, it retracts the old tuple (k, v, old_weight) ans inserts the new tuple (k, f(k), old_weight + delta_weight).
Yet another variation would only re-evaluate f if the current value is sufficiently stale, e.g., hasn't been updated for a day.
GC
If the input stream has lateness, we can apply integrate_trace_retain_keys to the integral to GC old keys.
Discussion
This design doesn't guarantee determinism in the strongest possible sense, but it does make non-deterministic functions safe to use in DBSP. In particular, if a value k is deleted and then added again to the input collection, the new value of f(k) can be different from the first value (likewise, in the version of the operator with refresh, the output can change even while the value is still present in the collection).