Skip to content

Refactor window execution to enable future optimizations #23273

Description

@2010YOUY01

Is your feature request related to a problem or challenge?

The proposed refactor in this issue can make window execution implementation simpler and more extensible. I think it is a necessary step if we want to invest further in better vectorization or more parallel execution paradigms mentioned in the issue:

The existing structure is not ideal: if we continue evolving it in its current form, future optimization work will likely require more special cases, making the implementation even harder to maintain and extend.

To sanity-check whether this refactor makes sense, we can run a few thought experiments using the potential optimizations mentioned in the issue: The examples include better parallelism and vectorization for fixed frames, segment-tree-based parallelism, etc. These optimizations are natural extensions of the ideal architecture introduced by this issue, but they are hard to add cleanly with the existing structure.

This issue explains, in order:

  • The issues in the existing implementation
  • How an ideal structure should look
  • A possible implementation plan

Issue with existing implementation

An ideal architecture requires a clean separation between the logical and physical layer of window function execution.

  • The logical layer describes what should be calculated (what are the expr for window frame/partition/...)
  • The physical layer answers 'how' by implementing the required methods (APIs that take the input batch and calculate the window expr output).

The major issue is that the existing abstraction layers leak into adjacent layers. The below figured listed several trait methods that is currently sitting in the wrong abstraction layer:

Image

I think the original design goal was:

  • WindowExpr is supposed to be the logical layer.
  • PartitionEvaluator is supposed to be the physical layer.

Over time, however, these responsibilities have become mixed. The decision-making flow has become bidirectional, and the implementation now relies on special cases to work around abstraction leaks.

My guess is that these are mostly hacks accumulated over the years. I cannot find a strong reason to preserve this design.

The ideal design should place each abstraction at the right layer, making the implementation simpler and easier to extend.

Image

Ideal Architecture

The gist is that we should fully separate the logical and physical layers of window execution.

  • Logical layer: WindowCall purely describes what we want to calculate. It contains the expressions for arguments, partitioning, ordering, and frame bounds.
  • Physical layer: WindowKernel purely provides the methods needed for execution. It represents the selected execution algorithm for a specific window call.

This design brings below benefits:

  • Simplicity: the control flow is one directional, WindowCall decides what window kernel to use, and window kernel purely provide methods for execution.
  • Extensibility: adding new parallelism scheme/or improve vectorized fast path means adding one window kernel, no deep structural changes needed.

Workflow

SQL / logical physical planning
  -> WindowCall              // pure description: function, args, partition/order/frame
  -> WindowKernel selection  // physical execution protocol chosen from shape + capabilities
  -> WindowExec              // execution routing: choose stream based on selected kernel
      -> NaiveAccumulatorStream
      -> SlidingAccumulatorStream
      -> other specialized streams

In rough terms:

/// pure description: function, args, partition/order/frame
struct WindowCall {
    name: String,
    field: FieldRef,
    function: WindowFunctionKind,
    args: Vec<Arc<dyn PhysicalExpr>>,
    filter: Option<Arc<dyn PhysicalExpr>>,
    partition_by: Vec<Arc<dyn PhysicalExpr>>,
    order_by: Vec<PhysicalSortExpr>,
    frame: Arc<WindowFrame>,
    options: WindowOptions,
}

/// pure execution: provided methods needed for a specific path
enum WindowKernel {
    /// Derived from existing Accumulator without `retract_batch`
    /// A nested-loop algorithm will be used.
    NaiveAccumulator(Box<dyn NaiveAccumulatorWindowKernel>),
    /// Derived from existing Accumulator with `retract_batch`
    /// A sliding window algorithm will be.
    SlidingAccumulator(Box<dyn SlidingAccumulatorWindowKernel>),
    /// And more variant to extend for optimization
    /// ...
}

DataFusion's existing Accumulator API already contains the primitives for two useful aggregate window algorithms:

  • update_batch() plus evaluate() can recompute a result for any frame. This supports a naive nested-loop fallback for all accumulators.
  • retract_batch() plus supports_retract_batch() allow incremental sliding-window execution when rows leave the frame.

If the accumulator does not support retract_batch(), a naive nested-loop evaluation can be used. If retract_batch() is supported and the window frame is a fixed sliding frame, a sliding-window algorithm can be used for optimization.

Then the implication for newly added user-defined window function is, it should only support the naive method to make it work universally (for aggregate function in window cases, it requires only update_batch() for the above naive path), but it can optionally support more fast paths (retract_batch for sliding window, or even vectorized API in the future), then the optimizer/execution will route that into the fast path if the query expression shape allows.

Here is a simple example to walk through the above workflow.

Workload 1: Sliding Aggregate

Example query:

SELECT
  avg(x) OVER (
    PARTITION BY k
    ORDER BY ts
    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
  ) AS avg_x
FROM t;

Planning:

  1. WindowCall holds the logical description: avg(x), PARTITION BY k, ORDER BY ts, and ROWS BETWEEN 2 PRECEDING AND CURRENT ROW.
  2. The planner sees that this is an aggregate window over a fixed moving frame.
  3. The planner asks the aggregate accumulator whether it supports retract_batch(). avg does;
  4. The planner chooses SlidingAccumulatorWindowKernel.
  5. WindowAggExec routes execution to a dedicated SlidingAccumulatorStream, because the selected kernel has the sliding-window execution protocol.

The kernel API can stay small because it only represents one physical protocol:

trait SlidingAccumulatorWindowKernel {
    fn evaluate_partition(
        &mut self,
        input: &PartitionWindowInput<'_>,
        frame: &FrameIndex,
    ) -> Result<ArrayRef>;
}

struct PartitionWindowInput<'a> {
    batch: &'a RecordBatch,
    args: Vec<ArrayRef>,
    filter: Option<BooleanArray>,
}

Very rough sliding-window algorithm sketch:

acc = create_avg_accumulator()
current_frame = range(0, 0)
output = []

for row_idx in partition_rows:
    next_frame = frame_for(row_idx)

    # Rows that were in the previous frame but are not in the next frame.
    leaving = current_frame.start .. next_frame.start
    if leaving is not empty:
        acc.retract_batch(values_for(leaving))

    # Rows that are in the next frame but were not in the previous frame.
    entering = current_frame.end .. next_frame.end
    if entering is not empty:
        acc.update_batch(values_for(entering))

    output.append(acc.evaluate())
    current_frame = next_frame

This is the fast path: each input row is added and removed at most once, so the cost is linear in the partition size for row-based fixed frames.

Workload 2: Naive Aggregate Fallback

Example query:

SELECT
  my_udaf(x) OVER (
    PARTITION BY k
    ORDER BY ts
    ROWS BETWEEN t.n_gap PRECEDING AND CURRENT ROW
  ) AS v
FROM t;

Assume my_udaf is a user-defined aggregate accumulator that supports update_batch() and evaluate(), but does not support retract_batch(). Also the window frame t.n_gap preceding can be arbitrary value, it's not supported by the sliding window algorithm.

Planning:

  1. WindowCall holds the logical description: my_udaf(x), PARTITION BY k, ORDER BY ts, and ROWS BETWEEN 2 PRECEDING AND CURRENT ROW.
  2. The planner sees that this is an aggregate window (without retract_batch() capability), and also over a non-fixed moving frame.
  3. The planner chooses NaiveAccumulatorWindowKernel.
  4. WindowAggExec routes execution to a dedicated NaiveAccumulatorStream.

The kernel API can again stay small:

trait NaiveAccumulatorWindowKernel {
    fn evaluate_partition(
        &self,
        input: &PartitionWindowInput<'_>,
        frame: &FrameIndex,
    ) -> Result<ArrayRef>;
}

Naive nested-loop algorithm sketch:

output = []

for row_idx in partition_rows:
    frame = frame_for(row_idx)

    # This is slower, but it only needs update_batch() and evaluate().
    acc = create_my_udaf_accumulator()
    acc.update_batch(values_for(frame))

    output.append(acc.evaluate())

Implementation Plan

I plan to do some prototyping to work out a practical refactoring plan. The known goals are:

  • Remove all three WindowExpr implementations and use WindowCall as the pure logical layer.
  • Use WindowKernel to replace the PartitionEvaluator
    • PartitionEvaluator is now a large trait that uses 3+ flags to decide behavior. I think it is hard to use and extend; small, focused traits inside WindowKernel enum variants should be better.
    • Provide an adapter like WindowKernel::LegacyPartitionEvaluator to make the refactor practical.
  • Evolve WindowAggExec in this direction and avoid changing BoundedWindowAggExec

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    PROPOSAL EPICA proposal being discussed that is not yet fully underwayenhancementNew feature or request
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions