正常情况下,一个流在执行一次终端操作之后便结束了。本文通过复制流内数据的方式,曲折的实现了同一个流上执行多次操作。
Demo只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。但如果流涉及大量I/O,也许性能会有提高。public class StreamForker{ private final Stream stream; private final Map
accept
方法将原始流中所有的数据添加到各个BlockingQueue内,此处实现了复制
class ForkingStreamConsumerimplements Consumer , Results { static final Object END_OF_STREAM = new Object(); private final List > queues; private final Map > actions; public ForkingStreamConsumer(List > queues, Map > actions) { this.queues = queues; this.actions = actions; } @Override public void accept(T t) { queues.forEach(q -> q.add(t)); } @SuppressWarnings("unchecked") void finish() { accept((T) END_OF_STREAM); } @SuppressWarnings("unchecked") @Override public R get(Object key) { try { return ((Future ) actions.get(key)).get(); } catch (Exception e) { throw new RuntimeException(e); } }}
此处重写了tryAdvance
接口,只是简单的从BlockingQueue中取出数据,执行action。业务逻辑中复制流是为了做什么事情,action就是这件事情。ForkingStreamConsumer.END_OF_STREAM
是Queue中数据结束的标示
class BlockingQueueSpliteratorimplements Spliterator { private final BlockingQueue q; BlockingQueueSpliterator(BlockingQueue q) { this.q = q; } @Override public boolean tryAdvance(Consumer action) { T t; while (true) { try { t = q.take(); break; } catch (InterruptedException e) { } } if (t != ForkingStreamConsumer.END_OF_STREAM) { action.accept(t); return true; } return false; } @Override public Spliterator trySplit() { return null; } @Override public long estimateSize() { return 0; } @Override public int characteristics() { return 0; }}