Stream Compaction - Introduction((Markus Billeter et al. Efficient Stream Compaction on Wide SIMD Many-Core Architectures))((InK-Compact-: In kernel Stream Compaction and Its Application to Multi-kernel Data Visualization on GPGPU- D.M. Hughes))
An efficient implementation of stream compaction on GPU is presented with code example. Full source code on github.
Stream compaction/reduction is commonly referred as the operation of removing unwanted elements in a collection. More formally, imagine we have a list of element of N elements and a predicate
that bisects
in wanted and unwanted elements (ones that satisfy the predicates
and ones that don't). The stream compaction of
under
is an array
. Sometimes is it enough to return
s.t. all valid (suppose
) elements are grouped at the first
position of
. Stream reduction is a key building block in several important algorithm especially in the field of parallel programming where is not uncommon to have large and sparse data structure to process (parallel breadht tree traversing, ray tracing etc.) that can lead to performance degradation of the overall algorithm.
As an example, stream compacting with the predicate the following array of twelve integers
would produce
.
Stream Compaction - Serial
Serial implementation in a single thread is straightforward:
template <typename T> void serialCompact (T* input, T*output, uint length, bool (*predicate)(T)){ uint j=0; for(uint i=0;i<length;i++) if(predicate(input[i])){ output[j] = input[i]; j++; } }

Stream Compaction Example: predicate here is ** != white **
It simply fill the output vector with only the valid element, leaving the rest unvalid.
Stream Compaction - Parallel Naive Approach
The parallel implementation is more challenging and the most effective parallel implementations produced so far (Thrust, Chag:pp) are mainly based on the computation of the so called (exclusive or inclusive prefix sum). The exclusive prefix sum on a vector with validity test
consist in producing a vector
s.t.
and
where k is the number of valid element strictly preceding (or up to the element
in case of inclusive prefix sum) the element of
. Inclusive prefix sum can be obtained from exclusive using the following: let
the vector of elements and
its exclusive prefix sum then
is the corresponding inclusive prefix sum.
As a side a more general and rigorous definition of prefix sum (or scan) is the following: given an associative operator , a vector
of
elements and an identity element
, the scan operation returns a vector
.
Suppose is the number of processor and
is the size of the vector. The input stream is divided in sub-streams of size of size
. The parallel algorithm is divided in three distinct phases:
- Each processor
count independently the number of valid elements in its sub-stream and save this value in
- A prefix sum operation is performed on the sub-array
producing the vector
- Each processor can indipendently from the others flush out its valid elements in the correct location (at the correct offset)
//phase1 for each processor p in parallel int count=0; for(int i=0;i<S;i++) if(valid(input_p[i]) count++; procCount[p]=count; //phase 2 procOffset = prefixSum(procCount) //phase3 for each processor p in parallel int j=0; for(int i=0;i<S;i++) if(valid(input_p[i]){ output[procOffset[p] + j ] j++; } }
Note that phase 2 can also be done in parallel and that its implementation on SIMT hardware would require separate discussion (maybe in a future article). We will then use available implementation as the one shipped with the thrust library. In CUDA usually this phase load is negligible with the respect to the other two as the number of processor P is usually several order of magnitude smaller than .
Stream Compaction - CUDA SIMT
GPUs hardware is made of a number of order of tens streaming multiprocessors (SMM), that roughly speaking are SIMD processors made up of streaming processor (the SIMD lanes, SP). The newest Maxwell Architecture introduced by NVidia count 16 SMM and 128 SP per SMM, for a total of 2048 cores.
SMM execute kernels in a SIMD fashion at group of 32 threads (warp) that execute the same instruction and are synchronized. SMM are not scalar processor, and hence, using the parallel approach described above where each SMM is considered as a single scalar processor would result in poor performance due to the large number of idle lanes lanes
.
GPU strategy
In this implementation each SMM is fully used to perform the block counting of valid elements and to finally compact the input.
Phase 1
The following pseudocode shows how block counting can be performed on a SIMT hardware using all lanes in parallel.
//input is a SMM private vector containing the portion of the //original input to be processed by the processor for each SMM processor p in parallel lanesCount[0]=0; ... lanesCount[32]=0; for(int i=0;i<S;i+=32) for each SIMD lane s in parallel if(predicate[input[i+s]) lanesCount[s]++; procCount[p] = reduce(+,lanesCount);
With the introduction of the intrinsic function this phase is both easier (respect to the pesudocode above) to implement and result in a efficient execution. This instruction synchronize threads at block level and take an integer as predicate. It return to *ALL
thread of the block* the number of non zero predicates passed to it by all block's threads.
Suppose for instance that our kernel is executed only by one block made up of four threads and that each thread call __syncthreads_count() as following:
//thread0 int BC=__syncthreads_count(1) //thread1 int BC=__syncthreads_count(0) //thread2 int BC=__syncthreads_count(1) //thread3 int BC=__syncthreads_count(0)
then each thread will own its private copy of the
variable containing the value 2!
This is exaclty what we need in order to count the number of valid elements per block!
The code for the phase 1 is the following:
template <typename T,typename Predicate> __global__ void computeBlockCounts(T* d_input,int length,int*d_BlockCounts,Predicate predicate){ int idx = threadIdx.x + blockIdx.x*blockDim.x; if(idx < length){ int pred = predicate(d_input[idx]); int BC=__syncthreads_count(pred); if(threadIdx.x==0) d_BlockCounts[blockIdx.x]=BC; }
Phase 2
Phase 2 as said before is just all-prefix-sum operation on block counters and for the sake of brevity thrust implementation is used here producing a vector of offsets .
cudaMalloc(&d_BlocksCount,sizeof(int)*numBlocks); cudaMalloc(&d_BlocksOffset,sizeof(int)*numBlocks); thrust::device_ptr<int> thrustPrt_bCount(d_BlocksCount); thrust::device_ptr<int> thrustPrt_bOffset(d_BlocksOffset); //phase1: compute block counters computeBlockCounts<<<numBlocks,blockSize>>>(d_input,length,d_BlocksCount,predicate); //prefix sum thrust call thrust::exclusive_scan(thrustPrt_bCount, thrustPrt_bCount + numBlocks, thrustPrt_bOffset); ...
Phase 3
The most elaborate part of the algorithm takes as input the "to be compacted" stream, and the block offsets computed during the previous 2 phases. It outputs the compacted stream. It is based on the intra warp voting function \__ballot(), the __popc((Note that this require that the machine support a warpsize word. If warpsize will be increased this may be cause compatibility problems.)) procedure and a bit manipulation. Let's start gradually by first throwing light on the __ballot() function.
unsigned int __ballot(int predicate);
evaluates predicate for all threads of the warp and returns an integer whose Nth bit is set if and only if predicate evaluates to non-zero for the Nth thread of the warp.
The __popc(int number) function return the number of set bits in its parameter.
The algorithm works as follows:
- It first compute for each thread a per warp offsets. The offset of the last thread in warp is then the number of valid element in its warp. These two values are stored for subsequent use. (thread offset in a private register
, while warp valid element in shared memory warpTotals[warpIndex]).
int pred= predicate(threadInput); int w_i = threadIdx.x/warpSize; //warp index int w_l = idx % warpSize;//thread index within a warp (warp lane) int t_m = INT_MAX >> (warpSize-w_l-1); int b = __ballot(pred) & t_m; int t_u = __popc(b); if(w_l==warpSize-1) warpTotals[w_i]=t_u + pred; __syncthreads();
where is the warp index within the block,
is the thread index within the warp and
is the thread mask, i.e. a number which the only set bits are the ones with index less then
.
contains only set bits corresponding to the validity of predicated of threads of lower index. __popc is then used to retrieve the number of set bits.
is the warp offset of thread
.
- A block level fence is required in order to ensure shared memory coherency.
if(w_i==0 && w_l<blockDim.x/warpSize){ int w_i_u=0; //log2(warpsize)=5 is warpsize=32 for(int j=0;j<log2(warpsize);j++){ int b_j =__ballot( warpTotals[w_l] & pow2i(j) ); w_i_u += (__popc(b_j & t_m) ) << j; } warpTotals[w_l]=w_i_u; }
- A intra block scan operation is now performed on the the
vector in order to compute a per-block offset. It can be performed by just one warp (assuming the number of warps does not exceed the size of a warp (
). The scan operation is performed in
steps because we are summing up numbers which max value is warpsize (each warp cannot perform more write than its number of threads). Usually in CUDA this number is 32 hence bit scan performed in 5 steps.
In other words is a number in which each bit is one if and only if the
bit of the
per-warp counter is one.
Each warp then mask only the bits (of the warps before it)
and finally sum them up.
Example per-warp offset (4 warps):
Suppose :
warpTotals[0] = 16 =1 0 0 0 0
warpTotals[1] = 18 =1 0 0 1 0
warpTotals[2] = 17 =1 0 0 0 1
warpTotals[3] = 13 =0 1 1 0 1
always output zero because
Step 0:
= __ballot( warpTotals[w_l] & pow2i(0) ) = 12 (column zero)
= __popc(12 & 1) << 0 = 0 << 0 =0
= __popc(12 & 3) << 0 = 0 << 0 =0
= __popc(12 & 7) << 0 = 1 << 0 =1
Step 1:
= __ballot( warpTotals[w_l] & pow2i(1) ) = 2 (column 1)
= __popc(2 & 1) << 1 = 0 << 1 =0
= __popc(2 & 3) << 1 = 1 << 1 =2
= __popc(2 & 7) << 1 = 1 << 1 =2
Step 2:
= __ballot( warpTotals[w_l] & pow2i(2) ) = 8 (column 2)
= __popc(8 & 1) << 1 = 0 << 2 =0
= __popc(8 & 3) << 1 = 0 << 2 =0
= __popc(8 & 7) << 1 = 0 << 2 =0
Step 3:
= __ballot( warpTotals[w_l] & pow2i(3) ) = 8 (column 3)
= __popc(8 & 1) << 1 = 0 << 3 =0
= __popc(8 & 3) << 1 = 0 << 3 =0
= __popc(8 & 7) << 1 = 0 << 3 =0
Step 4:
= __ballot( warpTotals[w_l] & pow2i(4) ) = 7 (column 4)
= __popc(7 & 1) << 4 = 1 << 4 =16
= __popc(7 & 3) << 4 = 2 << 4 =36
= __popc(7 & 7) << 4 = 3 << 4 =48
The final result for each warp is simply the sum of the w_i_u at all steps
- It is now possible knowing the warp, block, and grid offset to flush out the valid element at the correct location. If the current thread is managing a valid element then if flush it out at location: (its offset within its warp) + (its warp offset within block) + (block offset within grid).
if(pred){ d_output[t_u+warpTotals[w_i]+d_BlocksOffset[blockIdx.x]]= d_input[idx]; }
hello.. i'm hafeez.. still in learning cuda. i use numerical method to run parallel algorithm.. anyone can help me??
Hello hafeez nice to hear you're learning cuda. Post some questions, and I'll try to help you. Which numerical method are you currently working on? Is strem compaction a key building block for your parallel algorithm?
This doesn't seem to be working. I tried adapting it to use the output of an Optix Prime ray trace query, and compact the stream down to just those rays that were successful hits. Yet, I am still seeing ray misses in the resulting output. This is for a grid size of 512x512 that subdivides down into 32x32 sets of 16x16 blocks. The counting seems okay, and the block offsets seem okay. So the Compress kernel function seems broken or maybe it is not designed to do what I need?
Feel free to contact me via email at . I need more details in order to be able to help you.
Pingback: Prefix Sum on Vulkan - MYBOOAM