The add_input_map operator keeps an integral of the input, which is necessary to automatically retract values whose key is being overwritten (more precisely, this happens inside the input_upsert operator, invoked by add_input_map). It also supports filtering out updates that violate key or value bounds. This latter functionality is currently not used by the compiler, which is why the issue described below only manifested in DBSP tests.
The design of the filtering mechanism is flawed in that it only works in conjunction with GC, but GC does not apply in a sound way to the upsert operator. The idea was that the user calls integrate_trace_retain_values on the output stream, setting value bounds, and the upsert operator looks at this bound and decides which updates should be ignored. However, this means that old values can get removed from the integral by GC, and hence the upsert operator may not see them (or see them with wrong weights) and fail to reject updates to old values. Worse (and this actually happened in tests), it can see values that were already deleted or overwritten and reject valid updates. This can happen when GC removes one of several copies of a tuple.
In short, if we want upsert to reject updates that violate lateness, we cannot also GC the collection.
Clarification 1: the operator implements the filtering logic correctly, but the only way to use it is via integrate_trace_retain_values, which is not sound.
Clarification 2: The operator also implements filtering on keys (useful in rare cases when keys have lateness) via integrate_trace_retain_keys. I believe that this API is sound. If an update arrives to a very old key, it will get rejected without looking up the old key, as it will be below the waterline. GC is also sound for add_input_set for the same reason.
A sound API would specify lateness as an input to upsert directly, so it can do filtering without also doing GC.
Today the compiler does not GC the output of the upsert operator (in fact it doesn't even see this operator, which is instantiated in adapters when registering a new input table with a primary key). It instead attaches the "controlled filter" operator downstream from upsert. This is unfortunately also unsound. Consider two updates to the same key that arrive in the same batch. The first update is valid and should be applied, but the second update violates lateness, i.e., happens too far in the past and should be ignored. What happens today is that upsert will apply both updates in order, effectively discarding the first update and only keeping the second one. Then the controlled filter filters out the second update. The net result is that the old record is deleted and no new record is inserted.
A correct solution is to rely on the upsert operator to do filtering, as described above instead of filtering downstream from it.
The
add_input_mapoperator keeps an integral of the input, which is necessary to automatically retract values whose key is being overwritten (more precisely, this happens inside theinput_upsertoperator, invoked byadd_input_map). It also supports filtering out updates that violate key or value bounds. This latter functionality is currently not used by the compiler, which is why the issue described below only manifested in DBSP tests.The design of the filtering mechanism is flawed in that it only works in conjunction with GC, but GC does not apply in a sound way to the upsert operator. The idea was that the user calls
integrate_trace_retain_valueson the output stream, setting value bounds, and the upsert operator looks at this bound and decides which updates should be ignored. However, this means that old values can get removed from the integral by GC, and hence the upsert operator may not see them (or see them with wrong weights) and fail to reject updates to old values. Worse (and this actually happened in tests), it can see values that were already deleted or overwritten and reject valid updates. This can happen when GC removes one of several copies of a tuple.In short, if we want upsert to reject updates that violate lateness, we cannot also GC the collection.
Clarification 1: the operator implements the filtering logic correctly, but the only way to use it is via
integrate_trace_retain_values, which is not sound.Clarification 2: The operator also implements filtering on keys (useful in rare cases when keys have lateness) via
integrate_trace_retain_keys. I believe that this API is sound. If an update arrives to a very old key, it will get rejected without looking up the old key, as it will be below the waterline. GC is also sound foradd_input_setfor the same reason.A sound API would specify lateness as an input to upsert directly, so it can do filtering without also doing GC.
Today the compiler does not GC the output of the upsert operator (in fact it doesn't even see this operator, which is instantiated in
adapterswhen registering a new input table with a primary key). It instead attaches the "controlled filter" operator downstream from upsert. This is unfortunately also unsound. Consider two updates to the same key that arrive in the same batch. The first update is valid and should be applied, but the second update violates lateness, i.e., happens too far in the past and should be ignored. What happens today is that upsert will apply both updates in order, effectively discarding the first update and only keeping the second one. Then the controlled filter filters out the second update. The net result is that the old record is deleted and no new record is inserted.A correct solution is to rely on the upsert operator to do filtering, as described above instead of filtering downstream from it.