Paul White

How Parallel Plans Start Up – Part 2

SentryOne Newsletters

The SQLPerformance.com bi-weekly newsletter keeps you up to speed on the most recent blog posts and forum discussions in the SQL Server community.

eNews is a bi-monthly newsletter with fun information about SentryOne, tips to help improve your productivity, and much more.

Subscribe

Featured Author

Erin Stellato is a Principal Consultant with SQLskills and a Microsoft Data Platform MVP.

Erin’s Posts

This is the second part of a five-part series taking a deep dive into the way SQL Server row mode parallel plans start up. By the end of the first part, we had created execution context zero for the parent task. This context contains the entire tree of executable operators, but they are not yet ready for the iterative execution model of the query processing engine.

Iterative Execution

SQL Server executes a query through a process known as a query scan. Initialization of the plan starts at the root by the query processor calling Open on the root node. Open calls traverse the tree of iterators recursively calling Open on each child until the whole tree is opened.

The process of returning result rows is also recursive, triggered by the query processor calling GetRow at the root. Each root call returns a row at a time. The query processor continues calling GetRow on the root node until no more rows are available. Execution shuts down with a final recursive Close call. This arrangement allows the query processor to initialize, execute, and close down any arbitrary plan by calling the same interface methods just at the root.

To transform the tree of executable operators to one suitable for row-by-row processing, SQL Server adds a query scan wrapper to each operator. The query scan object provides the Open, GetRow, and Close methods needed for iterative execution.

The query scan object also maintains state information, and exposes other operator-specific methods needed during execution. For example, the query scan object for a Start-Up Filter operator (CQScanStartupFilterNew) exposes the following methods:

  • Open
  • GetRow
  • Close
  • PrepRecompute
  • GetScrollLock
  • SetMarker
  • GotoMarker
  • GotoLocation
  • ReverseDirection
  • Dormant

The additional methods for this iterator are mostly employed in cursor plans.

Initializing the Query Scan

The wrapping process is called initializing the query scan. It is performed by a call from the query processor to CQueryScan::InitQScanRoot. The parent task performs this process for the whole plan (contained within execution context zero). The translation process is itself recursive in nature, starting at the root and working its way down the tree.

During this process, each operator is responsible for initializing its own data and creating any runtime resources it needs. This may include creating additional objects outside of the query processor, for example the structures needed to communicate with the storage engine to fetch data from persistent storage.

A reminder of the execution plan, with node numbers added (click to enlarge):

Plan with node ids

The operator at the root (node 0) of the executable plan tree is a sequence project. It is represented by a class named CXteSeqProject. As usual, this is where the recursive transformation starts.

Query scan wrappers

As mentioned, the CXteSeqProject object is not equipped to take part in the iterative query scan process — it does not have the required Open, GetRow, and Close methods. The query processor needs a wrapper around the executable operator to provide that interface.

To get that query scan wrapper, the parent task calls CXteSeqProject::QScanGet to return an object of type CQScanSeqProjectNew. The linked map of operators created earlier is updated to reference the new query scan object, and its iterator methods are connected to the root of the plan.

The child of the sequence project is a segment operator (node 1). Calling CXteSegment::QScanGet returns a query scan wrapper object of type CQScanSegmentNew. The linked map is again updated, and iterator function pointers connected up to the parent sequence project query scan.

Half an exchange

The next operator is a gather streams exchange (node 2). Calling CXteExchange::QScanGet returns a CQScanExchangeNew as you might be expecting by now.

This is the first operator in the tree that needs to perform significant extra initialization. It creates the consumer side of the exchange via CXTransport::CreateConsumerPart. This creates the port (CXPort) — a data structure in shared memory used for synchronization and data exchange — and a pipe (CXPipe) for packet transport. Note that the producer side of the exchange is not created at this time. We only have half an exchange!

More wrapping

The process of setting up the query processor scan then continues with the merge join (node 3). I won’t always repeat the QScanGet and CQScan* calls from this point on, but they follow the established pattern.

The merge join has two children. Query scan setup continues as before with the outer (top) input — a stream aggregate (node 4), then a repartition streams exchange (node 5). The repartition streams again creates only the consumer side of the exchange, but this time there are two pipes created because DOP is two. The consumer side of this type of exchange has DOP connections to its parent operator (one per thread).

Next we have another stream aggregate (node 6) and a sort (node 7). The sort has a child not visible in execution plans — a storage engine rowset used to implement spilling to tempdb. The expected CQScanSortNew is therefore accompanied by a child CQScanRowsetNew in the internal tree. It is not visible in showplan output.

I/O profiling and deferred operations

The sort operator is also the first we have initialized so far that might be responsible for I/O. Assuming the execution has requested I/O profiling data (e.g. by requesting an ‘actual’ plan) the sort creates an object to record this runtime profiling data via CProfileInfo::AllocProfileIO.

The next operator is a compute scalar (node 8), called a project internally. The query scan setup call to CXteProject::QScanGet does not return a query scan object, because the calculations performed by this compute scalar are deferred to the first parent operator that needs the result. In this plan, that operator is the sort. The sort will do all the work assigned to the compute scalar, so the project at node 8 does not form part of the query scan tree. The compute scalar really isn’t executed at runtime. For more detail on deferred compute scalars, see Compute Scalars, Expressions and Execution Plan Performance.

Parallel scan

The final operator after the compute scalar on this branch of the plan is an index seek (CXteRange) at node 9. This produces the expected query scan operator (CQScanRangeNew), but it also requires a complex sequence of initializations to connect to the storage engine and facilitate a parallel scan of the index.

Just covering the highlights, initializing the index seek:

  • Creates a profiling object for I/O (CProfileInfo::AllocProfileIO).
  • Creates a parallel rowset query scan (CQScanRowsetNew::ParallelGetRowset).
  • Sets up a synchronization object to coordinate the runtime parallel range scan (CQScanRangeNew::GetSyncInfo).
  • Creates the storage engine table cursor and a read-only transaction descriptor.
  • Opens the parent rowset for reading (accessing the HoBt and taking the needed latches).
  • Sets the lock timeout.
  • Sets up prefetching (including associated memory buffers).

Adding row mode profiling operators

We have now reached the leaf level of this branch of the plan (the index seek has no child). Having just created the query scan object for the index seek, the next step is to wrap the query scan with a profiling class (assuming we requested an actual plan). This is done by a call to sqlmin!PqsWrapQScan. Note that profilers are added after the query scan has been created, as we begin to ascend the iterator tree.

PqsWrapQScan creates a new profiling operator as a parent of the index seek, via a call to CProfileInfo::GetOrCreateProfileInfo. The profiling operator (CQScanProfileNew) has the usual query scan interface methods. As well as collecting the data needed for actual plans, the profiling data is also exposed via the DMV sys.dm_exec_query_profiles.

Querying that DMV at this precise moment in time for the current session shows that only a single plan operator (node 9) exists (meaning it is the only one wrapped by a profiler):

Query profiles DMV

This screenshot shows the complete result set from the DMV at the current moment (it has not been edited).
Next, CQScanProfileNew calls the query performance counter API (KERNEL32!QueryPerformanceCounterStub) provided by the operating system to record the first and last active times of the profiled operator:

First active time

Last active time

The last active time will be updated using the query performance counter API every time code for that iterator runs.
The profiler then sets the estimated number of rows at this point in the plan (CProfileInfo::SetCardExpectedRows), accounting for any row goal (CXte::CardGetRowGoal). Since this is a parallel plan, it divides the result by the number of threads (CXte::FGetRowGoalDefinedForOneThread) and saves the result in the execution context.

The estimated number of rows is not visible via the DMV at this point, because the parent task will not execute this operator. Instead, the per-thread estimate will be exposed later in parallel execution contexts (which have not been created yet). Nevertheless, the per-thread number is saved in the parent task’s profiler — it just isn’t visible through the DMV.
The friendly name of the plan operator (“Index Seek”) is then set via a call to CXteRange::GetPhysicalOp:

Physical operator name

Before that, you may have noticed that querying the DMV showed the name as “???”. This is the permanent name shown for invisible operators (e.g. nested loops prefetch, batch sort) that don’t have a friendly name defined.

Finally, index metadata and current I/O statistics for the wrapped index seek are added via a call to CQScanRowsetNew::GetIoCounters:

I/O statistics

The counters are zero at the moment, but will be updated as the index seek performs I/O during finished plan execution.

More query scan processing

With the profiling operator created for the index seek, query scan processing moves back up the tree to the parent sort (node 7).

The sort performs the following initialization tasks:

  • Registers its memory usage with the query memory manager (CQryMemManager::RegisterMemUsage)
  • Calculates the memory required for the sort input (CQScanIndexSortNew::CbufInputMemory) and output (CQScanSortNew::CbufOutputMemory).
  • The sort table is created, along with its associated storage engine rowset (sqlmin!RowsetSorted).
  • A standalone system transaction (not bounded by the user transaction) is created for sort spill disk allocations, along with a fake work table (sqlmin!CreateFakeWorkTable).
  • The expression service is initialized (sqlTsEs!CEsRuntime::Startup) for the sort operator to perform the calculations deferred from the compute scalar.
  • Prefetch for any sort runs spilled to tempdb is then created via (CPrefetchMgr::SetupPrefetch).

Finally, the sort query scan is wrapped by a profiling operator (including I/O) just as we saw for the index seek:

Sort

Notice that the compute scalar (node 8) is missing from the DMV. That is because its work is deferred to the sort, is not part of the query scan tree, and so has no wrapping profiler object.

Moving up to the parent of the sort, the stream aggregate query scan operator (node 6) initializes its expressions and runtime counters (e.g. current group row count). The stream aggregate is wrapped with a profiling operator, recording its initial times:

Stream aggregate

The parent repartition streams exchange (node 5) is wrapped by a profiler (remember only the consumer side of this exchange exists at this point):

Exchange

The same is done for its parent stream aggregate (node 4), which is also initialized as previously described:

Another stream aggregate

The query scan processing returns to the parent merge join (node 3) but does not initialize it yet. Instead, we move down the inner (lower) side of the merge join, performing the same detailed tasks for those operators (nodes 10 to 15) as done for the upper (outer) branch:

Lower branch operators

Once those operators are processed, the merge join query scan is created, initialized, and wrapped with a profiling object. This includes I/O counters because a many-many merge join uses a work table (even though the current merge join is one-many):

Merge join

The same process is followed for the parent gather streams exchange (node 2) consumer side only, segment (node 1), and sequence project (node 0) operators. I won’t describe them in detail.

The query profiles DMV now reports a full set of profiler-wrapped query scan nodes:

All executed query nodes

Notice the sequence project, segment, and gather streams consumer have an estimated row count because these operators will be run by the parent task, not by additional parallel tasks (see CXte::FGetRowGoalDefinedForOneThread earlier). The parent task has no work to do in parallel branches, so the concept of estimated row count only makes sense for additional tasks.

The active time values shown above are somewhat distorted because I needed to stop execution and take DMV screenshots at each step. A separate execution (without the artificial delays introduced by using a debugger) produced the following timings:

Active times without delays

The tree is constructed in the same sequence described before, but the process is so quick there is only a 1 microsecond difference between the first wrapped operator’s active time (the index seek at node 9) and the last (sequence project at node 0).

End of Part 2

It might sound like we have done a lot of work, but remember we have only created a query scan tree for the parent task, and the exchanges only have a consumer side (no producer yet). Our parallel plan also only has one thread (as shown in the last screenshot). Part 3 will see the creation of our first additional parallel tasks.