Today I’m going to write a little about Approximate Histograms and how can they be used to get more insight on streamed big data feeds. I also provide a simple Java implementation and explain some parts of it.
Most of the common aggregation operations like counting and summing can be performed in parallel, as long there is a reduce phase where the result on each node can be combined. However, this is not very trivial for calculating histograms, as we need all the data on one dimension so that we can represent it in an histogram.
Having the data being processed by multiple nodes, each node is only able to construct an histogram of the partial data it receives. Ben-Haim and Tom-Tov presented a solution that uses an heap-based data structure to represent the data and a merge algorithm that allows to merge the data structures computed on different nodes into one that is an approximate histogram of all the dataset.
This technique has been applied by MetaMarkets with good accuracy for most of what an histogram can tell us about the data distribution: calculating the average and counting the quartiles and total number of data/events.
I took the liberty of doing a simple implementation of it, that is now being used in production for some months now:
importcom.google.common.collect.ImmutableSet;importcom.google.common.collect.Sets;importjava.util.Iterator;importjava.util.Set;importjava.util.TreeSet;/** * This is an approximate histogram class */publicclassApproximateHistogram{privatefinalintnumPairs;privatefinalTreeSet<CentroidPair>heap;publicApproximateHistogram(intnumPairs){this.numPairs=numPairs;this.heap=newTreeSet();}publicApproximateHistogram(TreeSet<CentroidPair>heap,intnumPairs){this.numPairs=numPairs;this.heap=heap;}publicSet<CentroidPair>heap(){returnImmutableSet.copyOf(heap);}publicvoidupdate(CentroidPairp){Iterator<CentroidPair>it=heap.iterator();while(it.hasNext()){CentroidPaircp=it.next();intcompare=Double.compare(cp.centroid,p.centroid);if(compare==0){cp.count+=p.count;return;}elseif(compare==1){break;}}// there was no similar centroid, so let's add the point to the heapheap.add(p);compress();}privatevoidcompress(){if(heap.size()<=numPairs){return;// compress only if needed}inti=0;doubleminDiff=Double.MAX_VALUE;CentroidPairlast=null,lastLast=null;// [ ..., minA, minB, ... ] two consecutive pairs which centroid diff is the minimumCentroidPairminA=null,minB=null;Iterator<CentroidPair>it=heap.iterator();while(it.hasNext()){lastLast=last;last=it.next();if(i>0){doublediff=last.centroid-lastLast.centroid;if(diff<minDiff){minA=lastLast;minB=last;minDiff=diff;}}++i;}intrepCount=Math.abs(minA.count)+Math.abs(minB.count);doublerepCentroid=(minA.centroid*Math.abs(minA.count)+minB.centroid*Math.abs(minB.count))/repCount;CentroidPairreplacementPair=newCentroidPair(-repCount,repCentroid);// store with negative sign the compressed entriesheap.remove(minA);heap.remove(minB);heap.add(replacementPair);}publicstaticApproximateHistogrammerge(ApproximateHistogram...histograms){ApproximateHistogrammerged=histograms[0];for(inti=1;i<histograms.length;i++){merged=merge(merged,histograms[i]);}returnmerged;}publicstaticApproximateHistogrammerge(ApproximateHistograma,ApproximateHistogramb){intbiggestSize=a.heap.size();if(b.heap.size()>biggestSize){biggestSize=b.heap.size();}TreeSet<CentroidPair>mergedHeap=Sets.newTreeSet();mergedHeap.addAll(a.heap);mergedHeap.addAll(b.heap);finalApproximateHistogrammerged=newApproximateHistogram(mergedHeap,biggestSize);// add the centroids of B to the merged (ignoring compression)intcompressTimes=mergedHeap.size()-biggestSize;while(compressTimes-->0){merged.compress();}returnmerged;}publicdoublecountBelow(doublecutPoint){finaldoubleEPSILON=0.00000001;if(heap.isEmpty())return0.0;CentroidPair[]heapPoints=heap.toArray(newCentroidPair[heap.size()]);intj=0;for(inti=0;i<heapPoints.length;i++){intcount=heapPoints[i].count;doublediff=heapPoints[i].centroid-cutPoint;// there's a pair with the cutPoint as centroidif(Math.abs(diff)<EPSILON){returnj+((count>0)?count:Math.abs(count)/2.0);}elseif(diff>0){// we already passed. it's somewhere between the last and this one// CASE: the cutPoint is before the first centroid pointif(i==0){if(count>0)return0.0;// we are sure no entry was less than the first centroidreturnMath.abs(count)*cutPoint/(2.0*heapPoints[i].centroid);// the first pair is an average. do the calculation}CentroidPairlastPoint=heapPoints[i-1];CentroidPaircurrentPoint=heapPoints[i];intlastCount=lastPoint.count;// if the last point is just an average point, discount itj-=((lastCount<0)?Math.abs(lastCount):0);// WHT?lastCount=Math.abs(lastCount);doublemb=lastCount+(Math.abs(currentPoint.count)-lastCount)*(cutPoint-lastPoint.centroid)/(currentPoint.centroid-lastPoint.centroid);doublesum=(lastCount+mb)*(cutPoint-lastPoint.centroid)/(2.0*(currentPoint.centroid-lastPoint.centroid));returnsum+j+Math.abs(lastCount)/2.0;}j+=Math.abs(count);}// some logic for the cases where b > centroid[last]CentroidPairlastPoint=heapPoints[heapPoints.length-1];intcount=lastPoint.count;// last point is an average and there's more than oneif(count<0&&heapPoints.length>1){count=Math.abs(count);CentroidPairlastLastPoint=heapPoints[heapPoints.length-2];// calculate a virtual final point which is separated half the distance than the last onedoubledistanceToPreviousPoint=lastPoint.centroid-lastLastPoint.centroid;distanceToPreviousPoint/=4.0;doublefinalCentroid=lastPoint.centroid+distanceToPreviousPoint;doublediff=finalCentroid-cutPoint;// count all!if(diff>0){j-=count/2.0;doubletrapezoidSum=count*(cutPoint-lastPoint.centroid)/(2.0*distanceToPreviousPoint);returnj+trapezoidSum;}// else return j!}returnj;}publicdoubleavg(){intcount=0;doublesum=0.0;for(CentroidPaircentroidPair:heap){intabsCount=Math.abs(centroidPair.count);count+=absCount;sum+=absCount*centroidPair.centroid;}return(count>0)?sum/count:0.0;}publicintcount(){intsum=0;for(CentroidPaircentroidPair:heap){sum+=centroidPair.count;}returnsum;}publicstaticclassCentroidPairimplementsComparable<CentroidPair>{intcount;doublecentroid;publicCentroidPair(intcount,doublecentroid){this.count=count;this.centroid=centroid;}@OverridepublicintcompareTo(CentroidPairo){returnDouble.compare(this.centroid,o.centroid);}@OverridepublicStringtoString(){returnnewStringBuilder("(").append(count).append(", ").append(centroid).append(")").toString();}}}
Internally the histogram is represented by a set of points (count, centroid), ordered by it’s centroid. When a new point is added, if the centroid already exists we increase the count number, otherwise we add the point with count 1 to the list.
Each histogram has a limit of points to keep and when a new insert exceeds this limit, a compression takes place. The compression consists in merging the two consecutive points where the difference between its centroids is the lower. The two are replaced by a single point with centroid on a place nearer to the neighbor point that has more counts: if they have the same count, it would be on the middle. The count of the new point will be the sum of the two old ones.
As Java doesn’t have unsigned numeric types, this implementation exploits the signal in the count field to flag if that point has been originated from compression of two other or if it is from raw observations. This can help answering to questions like: how many values are below X? If the points have a positive count for every point whose centroid is below X, we can truly count them. If they are negative, we know that point is an approximation, so we calculate the count using the trapezoidal estimation of Ben-Haim and Tom-Tov. This gives more accurate results than assuming every point might be an approximation and requires no extra space in Java-based data structures.
For merging more than one histogram, which happens when we want to combine results computed on different nodes. This is done by creating a big heap with the combined values of the histograms and applying compression on that heap, as described above, until the heap has the maximum number of points.
For very disperse data, this data structure may yield bad approximations if the number of points is not high enough. This data structure is very flexible and it’s easy to use it for streams with different distributions by just tuning the number of centroids we keep.