Menu Sidebar
Menu

Apache Flink

Very Fast Reservoir Sampling by Erik Erlandson

今天正式的把这个抽样算法加到Apache Flink里了, 这个算法是Erik Erlandson在他的博客上公布的大概是迄今为止最快速的抽样算法的. 这个算法采用了流行的gap distribution的方法抽样, 有效的在减少cpu使用的情况下, 减少了内存的占用, 通过生成抽样之间的gap, 进行近似随机抽样. 在他的博客http://erikerlandson.github.io/blog/2015/08/17/the-reservoir-sampling-gap-distribution/中,  证明了抽样可以通过生成gap实现随机抽样, 大大减少了随机数的生成时间和占用的内存, 实际应用下, 可以大大增加整体系统的运行效率. 他的博客中, 有基于Bernoulli Distribution(https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/sampling/BernoulliSampler.java)和Poisson Distribution(https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/sampling/PoissonSampler.java)的两种实现, 在Apache Flink中, Bernoulli分布实现了非replacement的抽样, 而Poisson分布实现了replacement的抽样. 这个新的优化算法, 使用了几何分布(https://en.wikipedia.org/wiki/Geometric_distribution)的思想, 对于样品大小远远小于数据大小 (1000000倍以上)的情况下, 样品的抽样率近似于P = R/j, R是样品集合大小, j是当前数据的总数. 这个算法有两部分组成. 作者定义了一个阈值T, T=4R, R是样品集合的大小. 这里的4是由随机数的好坏确定的. 确定的方法很复杂. 如果数据量小于阈值T, 则使用传统的水塘抽样. 这里使用水塘抽样的原因是, 如果当前的数据量小, 则生成gap会大大影响抽样结果的分布. 因为这个算法的gap是通过累计分布最后达到均匀分布的, 少量的gap会产生极大的误差. 通过实验表明, KS-test下, 如果样品大小是100,数据量是100000, 那么与随机抽样比较, 误差远远超过了 KS-test的容忍值D. 在数据量小的情况下, 生成少量随机数并不会对系统产生太大的负担, 这种trade-off是可以接受的. 水塘抽样的分布和数据量无关, 所以即使抽样大小和数据量很接近, […]

Kolmogorov–Smirnov测试

今天看Apache Flink是怎么测试streaming data的random sampling的. 然后看到里面测试代码(https://github.com/apache/flink/blob/master/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java)用的是Kolmogorov–Smirnov测试, 就查了一下相关的文献, 发现这个测试真是好用. 首先, 这个测试有很多的用法, 我使用它的原因主要是用于测试sampling是否是随机的. 在分布式系统下的sampling, 不能紧紧通过观察结果就判断当前抽样函数是否是随机抽样, 这个测试可以快速的测试一个expected result(期待函数的抽样结果)和一个actual result(当前函数的实际抽样结果), 通过勾画两个生成抽样的函数, 找到其中排序后, 每个误差最大值, 然后累计相加, 取生成一个p值, 通过对比p值和d值(d值为一个固定量), 可以直到当前抽样是否接近于随机抽样. p如果小于d,则是类随机抽样, 反之则否. p的公式很复杂, 但是代码很易懂,这里上下代码: //x和y是两组抽样结果, 通过这个函数, 返回p值, 这个函数可以判断这两个结构是不是来自于同一个抽样函数 public double kolmogorovSmirnovStatistic(double[] x, double[] y) { this.checkArray(x); this.checkArray(y); double[] sx = MathArrays.copyOf(x); double[] sy = MathArrays.copyOf(y); Arrays.sort(sx);//排序 Arrays.sort(sy); int n = sx.length; int m […]

Apache Flink中实现的两种水塘抽样算法

因为Flink和我毕设有关, 所以这几天一直在看Flink的源码. Flink中实现了两种水塘抽样算法: Reservoir Sampler With Replacement https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.html Reservoir Sampler Without Replacement https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.html 对应的paper是: Optimal Random Sampling from Distributed Streams Revisited http://researcher.watson.ibm.com/researcher/files/us-dpwoodru/tw11.pdf Random Sampling with a Reservoir http://www.cs.umd.edu/~samir/498/vitter.pdf 两种抽样的区别,(假设, k是已知的抽样数量): With Replacement是先取第一个样品, 然后做k个’复制'(这里的复制是对象是样品的value, 每个复制都有一个weight, 这个weight是随机生成的, 在Flink中, 用的是XOrShift随机数), 装满 大小为k的PriorityQueue中, 然后每次取下一个样品, 都做k个’复制’,然后如果’复制’的样品的weight大于当前PriorityQueue中, 就Remove PriorityQueue中队列顶端元素, 然后把当前’复制’放入PriorityQueue. Without Replacement是现取k个样品, 存入大小为k的PriorityQueue中, 每次取新的样品时, 算一个随机数, 如果这个随机数比当前PriorityQueue的队列顶端元素的weight值大,就Remove PriorityQueue中队列顶端元素, 然后放入当前元素. 值得一提的是, Flink中, 虽然在partition过程中实现了以上两种算法, 但是在reduce的过程, 并没有区别, 全部都用的是[2]算法, 这里我想,也许是为了保证reduce的速度, […]

书脊

这青苔碧瓦堆, 俺曾睡风流觉, 将五十年兴亡看饱.

December 2024
M T W T F S S
 1
2345678
9101112131415
16171819202122
23242526272829
3031