Stream Compaction on GPU - Efficient implementation - CUDA

Stream Compaction - Introduction((Markus Billeter et al. Efficient Stream Compaction on Wide SIMD Many-Core Architectures))((InK-Compact-: In kernel Stream Compaction and Its Application to Multi-kernel Data Visualization on GPGPU- D.M. Hughes))


An efficient implementation of stream compaction on GPU is presented with code example. Full source code on github.

Stream compaction/reduction is commonly referred as the operation of removing unwanted elements in a collection. More formally, imagine we have a list of element A_{0...N} of N elements and a predicate p : A \to\left \{ True,False\right \} that bisects A in wanted and unwanted elements (ones that satisfy the predicates p and ones that don't). The stream compaction of A under p is an array B=\left\{x \in A | p(x) = True\right\}. Sometimes is it enough to return B_{0\ldots N} s.t. all valid (suppose M \leq N) elements are grouped at the first M position of B. Stream reduction is a key building block in several important algorithm especially in the field of parallel programming where is not uncommon to have large and sparse data structure to process (parallel breadht tree traversing, ray tracing etc.) that can lead to performance degradation of the overall algorithm.

As an example, stream  compacting with the predicate p(x) = x >0 the following  array of twelve integers A=\left\{1,0,0,0,4,3,2,0,6,8,9,0\right\} would produce B=\left\{1,4,3,2,6,8,9\right\}.

Stream Compaction - Serial


Serial implementation in a single thread is straightforward:


template <typename T>
void serialCompact
(T* input, T*output, uint length,
 bool (*predicate)(T)){
uint j=0;
for(uint i=0;i<length;i++)
    if(predicate(input[i])){
        output[j] = input[i];
        j++;
    }
}
Stream Compaction Example

Stream Compaction Example: predicate here is ** != white **

It simply fill the output vector with only the valid element, leaving the rest unvalid.

Stream Compaction - Parallel Naive Approach


The parallel implementation is more challenging and the most effective parallel implementations produced so far (Thrust, Chag:pp) are mainly based on the computation of the so called (exclusive or inclusive prefix sum). The exclusive prefix sum on a vector V_{0\ldots N} with validity test p consist in producing a vector S_{0\ldots N} s.t. S_0 = 0 and S_i=k where k is the number of valid element strictly preceding (or up to the element i in case of inclusive prefix sum) the element of i. Inclusive prefix sum can be obtained from exclusive using the following: let A=\left\{0,a_1,\ldots,a_{n-1}\right\} the vector of elements and E=\left\{0,e_1,\ldots,e_{n-1}\right\} its exclusive prefix sum then I=\left\{e_1,e_2\ldots, e_{n-1}+p(a_{n-1}) \right\} is the corresponding inclusive prefix sum.

As a side a more general  and rigorous  definition  of prefix sum (or scan) is the following: given an associative operator \Diamond, a vector V_{0\ldots N} of N elements and an identity element I, the scan operation returns a vector P=\left\{I,v_0,(v_0 \Diamond v_1),(v_0 \Diamond v_1 \Diamond v_2),\ldots,(v_0 \Diamond v_1 \Diamond \ldots \Diamond v_{n-2})\right\}.

Suppose P is the number of processor and N, N>Pis the size of the vector. The input stream is divided in sub-streams  of size of size S. The parallel algorithm is divided in three distinct phases:

  1. Each processor p_i count independently the number of valid elements in its sub-stream and save this value in procCount[p_i]
  2. A prefix sum operation is performed on the sub-array procCount producing the vector procOffset_{0\ldots P}
  3. Each processor can indipendently from the others flush out its valid elements in the correct location (at the correct offset) output[procOffset[p_i]+currentValidElm]

 


//phase1
for each processor p in parallel
 int count=0;
 for(int i=0;i<S;i++)
    if(valid(input_p[i])
        count++;

 procCount[p]=count;

//phase 2
procOffset = prefixSum(procCount)

//phase3
for each processor p in parallel
  int j=0;
  for(int i=0;i<S;i++)
    if(valid(input_p[i]){
       output[procOffset[p] + j ]
       j++;
     }
}

Note that phase 2 can also be done in parallel and that its implementation on SIMT hardware would require separate discussion (maybe in a future article). We will then use available implementation as the one shipped with the thrust library. In CUDA usually this phase load is negligible with the respect to the other two as the number of processor P is usually several order of magnitude smaller than N.

Stream Compaction - CUDA SIMT


GPUs hardware is made of a number of order of tens streaming multiprocessors (SMM), that roughly speaking are SIMD processors made up of streaming processor (the SIMD lanes, SP). The newest Maxwell Architecture introduced by NVidia count 16 SMM and 128 SP per SMM, for a total of 2048 cores.

Maxwell Nvidia SMM

Maxwell Nvidia SMM

SMM execute kernels in a SIMD fashion at group of 32 threads (warp) that execute the same instruction and are synchronized. SMM are not scalar processor, and hence, using the parallel approach described above where each SMM is considered as a single scalar processor p would result in poor performance due to the large number of idle lanes lanes 16*(128-1)=2048-16.

GPU strategy

In this implementation each SMM is fully used to perform the block counting of valid elements and to finally compact the input.

Phase 1

The following pseudocode shows how block counting can be performed on a SIMT hardware using all lanes in parallel.

//input is a SMM private vector containing the portion of the //original input to be processed by the processor
for each SMM processor  p in parallel
 lanesCount[0]=0;
 ...
 lanesCount[32]=0;
 for(int i=0;i<S;i+=32)
   for each SIMD lane s in parallel
     if(predicate[input[i+s])
       lanesCount[s]++;

 procCount[p] = reduce(+,lanesCount);

With the introduction of the intrinsic function __syncthreads_count(predicate) this phase is both easier (respect to the pesudocode above) to implement and result in a efficient execution. This instruction synchronize threads at block level and take an integer as predicate. It return to *ALL
thread of the block* the number of non zero predicates passed to it by all block's threads.
Suppose for instance that our kernel is executed only by one block made up of four threads and that each thread call __syncthreads_count() as following:

//thread0
int BC=__syncthreads_count(1)
//thread1
int BC=__syncthreads_count(0)
//thread2
int BC=__syncthreads_count(1)
//thread3
int BC=__syncthreads_count(0)

then each thread t_i will own its private copy of the BC variable containing the value 2!
This is exaclty what we need in order to count the number of valid elements per block!

The code for the phase 1 is the following:

template <typename T,typename Predicate>
__global__ void
computeBlockCounts(T* d_input,int length,int*d_BlockCounts,Predicate predicate){

int idx = threadIdx.x + blockIdx.x*blockDim.x;
if(idx < length){
int pred = predicate(d_input[idx]);
int BC=__syncthreads_count(pred);
if(threadIdx.x==0)
  d_BlockCounts[blockIdx.x]=BC;

}

Phase 2

Phase 2 as said before is just all-prefix-sum operation on block counters and for the sake of brevity thrust implementation is used here producing a vector of offsets blockOffsets.

cudaMalloc(&d_BlocksCount,sizeof(int)*numBlocks);
cudaMalloc(&d_BlocksOffset,sizeof(int)*numBlocks);
thrust::device_ptr<int> thrustPrt_bCount(d_BlocksCount);
thrust::device_ptr<int> thrustPrt_bOffset(d_BlocksOffset);
//phase1: compute block counters
computeBlockCounts<<<numBlocks,blockSize>>>(d_input,length,d_BlocksCount,predicate);

//prefix sum thrust call
thrust::exclusive_scan(thrustPrt_bCount, thrustPrt_bCount + numBlocks, thrustPrt_bOffset);

...

Phase 3

The most elaborate part of the algorithm takes as input the "to be compacted" stream, and the block offsets computed during the previous 2 phases. It outputs the compacted stream. It is based on the intra warp voting function \__ballot(), the __popc((Note that this require that the machine support a warpsize word. If warpsize will be increased this may be cause compatibility problems.)) procedure and a bit manipulation. Let's start gradually by first throwing  light on the __ballot() function.

unsigned int __ballot(int predicate);
evaluates predicate for all threads of the warp and returns an integer whose Nth bit is set if and only if predicate evaluates to non-zero for the Nth thread of the warp.

The __popc(int number) function return the number of set bits in its parameter.

The algorithm works as follows:

  • It first compute for each thread a per warp offsets. The offset of the last thread in warp is then the number of valid element in its warp. These two values are stored for subsequent use. (thread offset in a private register w_u, while warp valid element in shared memory warpTotals[warpIndex]).
int pred= predicate(threadInput);
int w_i = threadIdx.x/warpSize; //warp index
int w_l = idx % warpSize;//thread index within a warp (warp lane)
int t_m = INT_MAX >> (warpSize-w_l-1);
int b = __ballot(pred) & t_m;
int t_u = __popc(b);
if(w_l==warpSize-1)
   warpTotals[w_i]=t_u + pred;
__syncthreads();

where w_i is the warp index within the block,w_l is the thread index within the warp andt_m is the thread mask, i.e. a number which the only set bits are the ones with index less thenw_l. b contains only set bits corresponding to the validity of predicated of threads of lower index. __popc is then used to retrieve the number of set bits. t_u is the warp offset of thread w_l.

  • A block level fence is required in order to ensure shared memory coherency.
if(w_i==0 && w_l<blockDim.x/warpSize){
  int w_i_u=0;
//log2(warpsize)=5 is warpsize=32
  for(int j=0;j<log2(warpsize);j++){
    int b_j =__ballot( warpTotals[w_l] & pow2i(j) );
    w_i_u += (__popc(b_j & t_m)  ) << j;
  }
  warpTotals[w_l]=w_i_u;
}
  • A intra block scan operation is now performed on the the warpOffset vector in order to compute a per-block offset. It can be performed by just one warp (assuming the number of warps does not exceed the size of a warp (warpsize >= \frac{blocksize}{warpsize}). The scan operation is performed in log2(warpsize) steps because we are summing up numbers which max value is warpsize (each warp cannot perform more write than its number of threads). Usually in CUDA this number is 32 hence bit scan performed in 5 steps.

In other words b_j is a number in which each bit is one if and only if the j^{th} bit of the j^{th} per-warp counter  is one.
Each warp then  mask only the bits i'< i (of the warps before it)
and finally sum them up.


Example per-warp offset (4 warps):

Suppose :
warpTotals[0] = 16 =1 0 0 0 0
warpTotals[1] = 18 =1 0 0 1 0
warpTotals[2] = 17 =1 0 0 0 1
warpTotals[3] = 13 =0 1 1 0 1

w_0 always output zero because t_m = 0 >= (b_j & t_m) =0

Step 0:

b_0 = __ballot( warpTotals[w_l] & pow2i(0) ) = 12 (column zero)
w_1 = __popc(12 & 1) << 0 = 0 << 0 =0
w_2= __popc(12 & 3) << 0 = 0 << 0 =0
w_3= __popc(12 & 7) << 0 = 1 << 0 =1

Step 1:

b_0 = __ballot( warpTotals[w_l] & pow2i(1) ) = 2 (column 1)
w_1= __popc(2 & 1) << 1 = 0 << 1 =0
w_2= __popc(2 & 3) << 1 = 1 << 1 =2
w_3= __popc(2 & 7) << 1 = 1 << 1 =2

Step 2:

b_0 =  __ballot( warpTotals[w_l] & pow2i(2) ) = 8 (column 2)
w_1= __popc(8 & 1) << 1 = 0 << 2 =0
w_2= __popc(8 & 3) << 1 = 0 << 2 =0
w_3= __popc(8 & 7) << 1 = 0 << 2 =0

Step 3:

b_0 = __ballot( warpTotals[w_l] & pow2i(3) ) = 8 (column 3)
w_1= __popc(8 & 1) << 1 = 0 << 3 =0
w_2= __popc(8 & 3) << 1 = 0 << 3 =0
w_3= __popc(8 & 7) << 1 = 0 << 3 =0

Step 4:

b_0 = __ballot( warpTotals[w_l] & pow2i(4) ) = 7 (column 4)
w_1= __popc(7 & 1) << 4 = 1 << 4 =16
w_2= __popc(7 & 3) << 4 = 2 << 4 =36
w_3= __popc(7 & 7) << 4 = 3 << 4 =48

The final result for each warp is simply the sum of the w_i_u at all steps
w_0 = 0 + 0 + 0 + 0 + 0 = 0
w_1 = 0 + 0 + 0 + 0 + 16 = 16
w_2 = 0 + 2 + 0 + 0 + 36 = 38 = 16+18
w_3 = 1 + 2 + 0 + 0 + 48 = 51 = 16+18+17


 

  • It is now possible knowing the warp, block, and grid offset to flush out the valid element at the correct location. If the current thread is managing a valid element then if flush it out at location: (its offset within its warp) + (its warp offset within block) + (block offset within grid).
if(pred){
d_output[t_u+warpTotals[w_i]+d_BlocksOffset[blockIdx.x]]= d_input[idx];

}

 

 

 

5 thoughts on “Stream Compaction on GPU - Efficient implementation - CUDA”

  1. hello.. i'm hafeez.. still in learning cuda. i use numerical method to run parallel algorithm.. anyone can help me??

    1. Hello hafeez nice to hear you're learning cuda. Post some questions, and I'll try to help you. Which numerical method are you currently working on? Is strem compaction a key building block for your parallel algorithm?

  2. This doesn't seem to be working. I tried adapting it to use the output of an Optix Prime ray trace query, and compact the stream down to just those rays that were successful hits. Yet, I am still seeing ray misses in the resulting output. This is for a grid size of 512x512 that subdivides down into 32x32 sets of 16x16 blocks. The counting seems okay, and the block offsets seem okay. So the Compress kernel function seems broken or maybe it is not designed to do what I need?

  3. Pingback: Prefix Sum on Vulkan - MYBOOAM

Leave a Reply

Your email address will not be published. Required fields are marked *