博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
如何以并发方式在同一个流上执行多种操作?--复制流
阅读量:6714 次
发布时间:2019-06-25

本文共 3043 字,大约阅读时间需要 10 分钟。

正常情况下,一个流在执行一次终端操作之后便结束了。本文通过复制流内数据的方式,曲折的实现了同一个流上执行多次操作。

Demo只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。但如果流涉及大量I/O,也许性能会有提高。

public class StreamForker
{ private final Stream
stream; private final Map
, ?>> forks = new HashMap<>(); public StreamForker(Stream
stream) { this.stream = stream; } public StreamForker
fork(Object key, Function
, ?> f) { forks.put(key, f); return this; } public Results getResults() { ForkingStreamConsumer
consumer = build(); try { stream.sequential().forEach(consumer); } finally { consumer.finish(); } return consumer; } private ForkingStreamConsumer
build() { List
> queues = new ArrayList<>(); Map
> actions = forks.entrySet().stream().reduce(new HashMap
>(), (map, e) -> { map.put(e.getKey(), getOperationResult(queues, e.getValue())); return map; }, (m1, m2) -> { m1.putAll(m2); return m1; }); return new ForkingStreamConsumer<>(queues, actions); } private Future
getOperationResult(List
> queues, Function
, ?> f) { BlockingQueue
queue = new LinkedBlockingQueue<>(); queues.add(queue); Spliterator
spliterator = new BlockingQueueSpliterator<>(queue); Stream
source = StreamSupport.stream(spliterator, false); return CompletableFuture.supplyAsync(() -> f.apply(source)); }}

accept方法将原始流中所有的数据添加到各个BlockingQueue内,此处实现了复制

class ForkingStreamConsumer
implements Consumer
, Results { static final Object END_OF_STREAM = new Object(); private final List
> queues; private final Map
> actions; public ForkingStreamConsumer(List
> queues, Map
> actions) { this.queues = queues; this.actions = actions; } @Override public void accept(T t) { queues.forEach(q -> q.add(t)); } @SuppressWarnings("unchecked") void finish() { accept((T) END_OF_STREAM); } @SuppressWarnings("unchecked") @Override public
R get(Object key) { try { return ((Future
) actions.get(key)).get(); } catch (Exception e) { throw new RuntimeException(e); } }}

此处重写了tryAdvance接口,只是简单的从BlockingQueue中取出数据,执行action。业务逻辑中复制流是为了做什么事情,action就是这件事情。ForkingStreamConsumer.END_OF_STREAM是Queue中数据结束的标示

class BlockingQueueSpliterator
implements Spliterator
{ private final BlockingQueue
q; BlockingQueueSpliterator(BlockingQueue
q) { this.q = q; } @Override public boolean tryAdvance(Consumer
action) { T t; while (true) { try { t = q.take(); break; } catch (InterruptedException e) { } } if (t != ForkingStreamConsumer.END_OF_STREAM) { action.accept(t); return true; } return false; } @Override public Spliterator
trySplit() { return null; } @Override public long estimateSize() { return 0; } @Override public int characteristics() { return 0; }}

转载地址:http://foelo.baihongyu.com/

你可能感兴趣的文章
互联网协议入门(2)
查看>>
DataSource的可配参数有哪些,有哪些DataSource可以用
查看>>
本地文件共享服务(nfs samba ftp)
查看>>
scp通过代理proxy传输文件
查看>>
数据段、代码段、堆栈段、BSS段的区别
查看>>
WebService之Axis2快速入门(5): 管理会话(Session)
查看>>
以太坊RPC接口使用
查看>>
普通html标签<form>和struts2<s:form>的区别
查看>>
安装NTFS For Mac时显示文件已损坏怎么办
查看>>
-webkit-line-clamp实现多行文字溢出隐藏显示省略号
查看>>
配置sunspot tomcat结合sunspot_rails
查看>>
飞信系统4月29日升级后飞信机器人无法使用的解决办法
查看>>
Canonical今天宣布推出Plex Media Server作为Snap Store中的Snap应用程序
查看>>
Font Awesome
查看>>
Dubbo消费者
查看>>
虚拟化中虚拟机处理器核数与物理主机cpu的关系
查看>>
org.codehaus.jackson.map.JsonMappingException: No suitable constructor found for type
查看>>
MYSQL: mysqlbinlog读取二进制文件报错read_log_event()
查看>>
随机产生由特殊字符,大小写字母以及数字组成的字符串,且每种字符都至少出现一次...
查看>>
我的友情链接
查看>>