博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Tiny并行计算框架之使用介绍
阅读量:6213 次
发布时间:2019-06-21

本文共 12470 字,大约阅读时间需要 41 分钟。

hot3.png

呵呵,昨天看到两新粉,一激动,就想着今天来写这篇文章。

其实一直在关注这个领域,但是一直没有信心来写,所以一直期望着有一个开源的来用。

看到了彭渊大师的,确实有一种相见恨晚的感觉,于是就准备去研究一番,详细见本人的感想文章由,确实来说,感觉到有一种啃不动的感觉,当然也可能是本人水平不足的原因所致。但是不管怎么说,促动了本人来写一个简单的并行计算框架。

在此引用本人的:“牛人的代码就是生手也一看就懂;生手的代码就是牛人来了也看不懂。

好的,亲们,不管你是生手还是牛人,let's GO!

HelloWorld之一

当然,还是从Hello说起,不过这次的hello与之前不太一样,管呢,先看看再说:

public class WorkerHello extends AbstractWorker {    public WorkerHello() throws RemoteException {        super("hello");    }    public Warehouse doWork(Work work) throws RemoteException {        String name = work.getInputWarehouse().get("name");        System.out.println(String.format("id %s: Hello %s", getId(), name));        Warehouse outputWarehouse = new WarehouseDefault();        outputWarehouse.put("helloInfo", "Hello," + name);        return outputWarehouse;    }}

首先,工人Hello继承自抽象工人,也就是说他首先得是个工人,然后呢是个Hello工人。

在它的构造函数中,抛出一个RemoteException,表明,它是可以被远程调用的工人,在构造方法中调用super("hello"),表明这个工人是个干hello活的工人。

既然是工人么,因此当然得做工作了。

首先从工作的的仓库中取出一个叫name的字符串,然后控制台打一下,然后构建了一个输出的仓库,在里面放了一个helloInfo的字符串,然后返回输出仓库,工人的任务就算完成了。

下面看看示例代码:

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        JobCenter jobCenter = new JobCenterLocal();        for (int i = 0; i < 5; i++) {            jobCenter.registerWorker(new WorkerHello());        }        Foreman helloForeman = new ForemanSelectOneWorker("hello");        jobCenter.registerForeman(helloForeman);        Warehouse inputWarehouse = new WarehouseDefault();        inputWarehouse.put("name", "world");        Work work = new WorkDefault("hello", inputWarehouse);        Warehouse outputWarehouse = jobCenter.doWork(work);        System.out.println(outputWarehouse.get("helloInfo"));        jobCenter.stop();    }

首先开个职业介绍所,然后构建一了5个Hello工人,放在注册到职业介绍所去。

然后又注册了一个专门干hello活的包工头到职业介绍所,这个包工头有点特别,随便找一个hello工人来干hello这个活。

然后,构建了一个工作,介个工作是个hello工作,它的来料仓库里放了个name是world的值。

然后他就对职业介绍所说,你帮咱把这个活干干。

活干完了,也没有发生异常,顺利的在结果仓库里找到了helloInfo这个值,并且从控制台打出。

下面是运行结果:

id 46fbffdeb18b45f28cda4617795c2a52: Hello worldHello,world

从上面的例子当中,我们理解了下面几个概念:

职业介绍所:JobCenter,主要用于注册工人,注册包工头,接受或处理任务;包工头:领取工作并招募工人,完成工作,并返回结果工人:就是我们常说的民工了,只知道来料加工,处于生态环境的低层,最后还没有得工资工作:只有工作类型和来料仓库仓库:用于放各种来料或成品

职业介绍所,一般来说不用写,框架已经提供;工作,一般来说不用写;工头,绝大多数不需要写,框架已经提供了若干类型工头,一般够用了;工人,一定需要写。

自此,简单的hello并行计算就算完成了。

HelloWorld之二

上面的hello工作完成之后,老板突发齐想,一个hello吼得声音太小了,偶想让所有的工人都帮偶齐声喊一起Hello,World,那该多壮观,当然老板有钱,说干就干:

public static void main(String[] args) throws IOException, ClassNotFoundException {        JobCenter jobCenter = new JobCenterLocal();        for (int i = 0; i < 5; i++) {            jobCenter.registerWorker(new WorkerHello());        }        Foreman helloForeman = new ForemanSelectAllWorker("hello");        jobCenter.registerForeman(helloForeman);        Warehouse inputWarehouse = new WarehouseDefault();        inputWarehouse.put("name", "world");        Work work = new WorkDefault("hello", inputWarehouse);        jobCenter.doWork(work);        jobCenter.stop();}

当然,这次的包工头换了一下,这个包工头会找所有的工人来干活,结果如下:

id 83274d8f8c194bb89d773c232e867cc4: Hello worldid 16fbf219d3cf4ba48eef23c260de509a: Hello worldid 9c17a119a4f341d68b589a503712b0f9: Hello worldid e7e3b2bdc9444a179ad62abdd35275e1: Hello worldid 4b12a1b70f5d43e2bff473382096dfbe: Hello world

老板一看,尼妈,这帮工人喊是喊完了,这声音就响过(用的是System.out)就没有了,也不知道有几个工人给喊过,包工头说哦,我没有干这收集数据的活,你想要呀,你想要就吱声呀,我加个结果合并给你:

public static void main(String[] args) throws IOException, ClassNotFoundException {    JobCenter jobCenter = new JobCenterLocal();    for (int i = 0; i < 5; i++) {        jobCenter.registerWorker(new WorkerHello());    }    Foreman helloForeman = new ForemanSelectAllWorker("hello", new HelloWorkCombiner());    jobCenter.registerForeman(helloForeman);    Warehouse inputWarehouse = new WarehouseDefault();    inputWarehouse.put("name", "world");    Work work = new WorkDefault("hello", inputWarehouse);    Warehouse outputWarehouse = jobCenter.doWork(work);    List
result = outputWarehouse.get("helloInfo"); System.out.println(result.size()); jobCenter.stop();}

Hello结果收集器,用于把工人干的活合并成一个结果出来:

public class HelloWorkCombiner implements WorkCombiner {    public Warehouse combine(List
warehouseList) throws RemoteException { Warehouse warehouse = new WarehouseDefault(); List
helloList = new ArrayList
(); for (Warehouse w : warehouseList) { helloList.add((String) w.get("helloInfo")); } warehouse.put("helloInfo", helloList); return warehouse; }}

老板终于称心如意了。

分布式求和

老板消停了一下下,又想,偶想知道从1加到10000这个结果值是多少。但是一个计算机算,算得太慢了,能不能多几台机器帮我看看,让我早些知道结果?(仅用于说明原理,你可以理解为从1加到10000需要几个小时)

首先造个工人:

public class WorkerSum extends AbstractWorker {    public WorkerSum() throws RemoteException {        super("sum");    }    public Warehouse doWork(Work work) throws RemoteException {        long start = (Long) work.getInputWarehouse().get("start");        long end = (Long) work.getInputWarehouse().get("end");        long sum = 0;        for (long i = start; i <= end; i++) {            sum += i;        }        Warehouse outputWarehouse = new WarehouseDefault();        outputWarehouse.put("sum", sum);        return outputWarehouse;    }}

工人从来料仓库获取开始和结束,然后计算合计值并放在输出仓库中的sum值域中。

但是这活该怎么分给工人呢,工人算完的结果又怎么合并呢?

这个时候,就需要搞个工作分解合并器给包工头用了:

public class SumSplitterCombiner implements WorkSplitterCombiner {    public List
split(Work work, List
workers) throws RemoteException { List
list = new ArrayList
(); long start = (Long) work.getInputWarehouse().get("start"); long end = (Long) work.getInputWarehouse().get("end"); long count = end - start + 1; long step = count / workers.size(); for (int i = 0; i < workers.size(); i++) { Warehouse subInputWarehouse = new WarehouseDefault(); subInputWarehouse.put("start", step * i + start); if (i == workers.size() - 1) { subInputWarehouse.put("end", end); } else { subInputWarehouse.put("end", step * (i + 1)); } list.add(subInputWarehouse); } return list; } public Warehouse combine(List
warehouseList) throws RemoteException { Warehouse outputWarehouse = new WarehouseDefault(); long sum = 0; for (Warehouse w : warehouseList) { sum += (Long) w.get("sum"); } outputWarehouse.put("sum", sum); return outputWarehouse; }}

一共两方法,一个分解方法,一个合并方法,非常容易理解。

万事具备,呵呵,开工:

public class Test {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        JobCenter jobCenter = new JobCenterLocal();        JobCenter center = new JobCenterRemote();        for (int i = 0; i < 5; i++) {            center.registerWorker(new WorkerSum());        }        Foreman helloForeman = new ForemanSelectAllWorker("sum", new SumSplitterCombiner());        center.registerForeman(helloForeman);        Warehouse inputWarehouse = new WarehouseDefault();        inputWarehouse.put("start", 1l);        inputWarehouse.put("end", 10000l);        Work work = new WorkDefault("sum", inputWarehouse);        Warehouse outputWarehouse = center.doWork(work);        System.out.println(outputWarehouse.get("sum"));        jobCenter.stop();        center.stop();    }}

注意,输入仓库是两个长整型数,因此,下面两句最后的值是1-10000,而不是11~100001

inputWarehouse.put("start", 1l);        inputWarehouse.put("end", 10000l);
下面是运算输出结果:
50005000

多阶段任务

当然,简单的任务都是一下就干完了的,复杂的工作就需要分成多个阶段进行了。不同的阶段需要的包工头或工人又都是不一定相同的。对于解决这种类型的任务,咱也有相当简单的解决办法。

先造个工人:

public class WorkerHello extends AbstractWorker {    public WorkerHello() throws RemoteException {        super("hello");    }    public Warehouse doWork(Work work) throws RemoteException {        String name = work.getInputWarehouse().get("name");        System.out.println(String.format("id %s: Hello %s", getId(), name));        Warehouse outputWarehouse = new WarehouseDefault();        outputWarehouse.put("name", name + "_1");        return outputWarehouse;    }}

这个工人有点怪,每次都是给名字后面附加一个"_1",然后原样返回。别的没有啥子不同。

EN,然后来做做一系列的工作:

public class TestSerialWork {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        JobCenter jobCenter = new JobCenterLocal();        for (int i = 0; i < 5; i++) {            jobCenter.registerWorker(new WorkerHello());        }        Foreman helloForeman = new ForemanSelectOneWorker("hello");        jobCenter.registerForeman(helloForeman);        Warehouse inputWarehouse = new WarehouseDefault();        inputWarehouse.put("name", "world");        Work work = new WorkDefault("hello", inputWarehouse);        work.setNextWork(new WorkDefault("hello")).setNextWork(new WorkDefault("hello"));        Warehouse warehouse = jobCenter.doWork(work);        System.out.println(warehouse.get("name"));        jobCenter.stop();    }}

与前面的例子唯一的不同就是

work.setNextWork(new WorkDefault("hello")).setNextWork(new WorkDefault("hello"));

这里通过指定下一工作,来建立了一个系列工作,这里定义的工作是三步,下面是运行结果:

id 2a53a967e3b84289beb3dbaf12a7d8be: Hello worldid e3d471c27e264a1a87cf263605bfe9bd: Hello world_1id 2a53a967e3b84289beb3dbaf12a7d8be: Hello world_1_1world_1_1_1

运行结果与预期完全一致。

通过序列工作的方式可以把复杂的工作分解成简单的工作,而且不同的工作可以由不同的包工头和工人来完成。

圆周率计算

圆周率的计算一般来说是比较费时间的,详细fourinone作者在文章 中已经在详细的描述,这里仅采用其文章中所述方法。

public static void main(String[] args){	double pi=0.0;	for(double i=1.0;i<1000000001d;i++){		pi += Math.pow(-1,i+1)/(2*i-1);	}	System.out.println(4*pi);}

来计算,先创建个工人:

public class PiWorker extends AbstractWorker {    public PiWorker() throws RemoteException {        super("pi");    }    @Override    protected Warehouse doWork(Work work) throws RemoteException {        long m = (Long) work.getInputWarehouse().get("start");        long n = (Long) work.getInputWarehouse().get("end");        double pi = 0.0d;        for (double i = m; i < n; i++) {            pi += Math.pow(-1, i + 1) / (2 * i - 1);        }        work.getInputWarehouse().put("pi", 4 * pi);        return work.getInputWarehouse();    }}

再写个拆分合并器:

public class PiSplitterCombiner implements WorkSplitterCombiner {    public List
split(Work work, List
workers) throws RemoteException { List
list = new ArrayList
(); long start = (Long) work.getInputWarehouse().get("start"); long end = (Long) work.getInputWarehouse().get("end"); long count = end - start + 1; long step = count / workers.size(); for (int i = 0; i < workers.size(); i++) { Warehouse subInputWarehouse = new WarehouseDefault(); subInputWarehouse.put("start", step * i + start); if (i == workers.size() - 1) { subInputWarehouse.put("end", end); } else { subInputWarehouse.put("end", step * (i + 1)); } list.add(subInputWarehouse); } return list; } public Warehouse combine(List
warehouseList) throws RemoteException { Warehouse outputWarehouse = new WarehouseDefault(); double pi = 0d; for (Warehouse w : warehouseList) { pi += (Double) w.get("pi"); } outputWarehouse.put("pi", pi); return outputWarehouse; }}

接下来是测试类

public class Test {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        JobCenter jobCenter = new JobCenterLocal();        for (int i = 0; i < 10; i++) {            jobCenter.registerWorker(new PiWorker());        }        Foreman helloForeman = new ForemanSelectAllWorker("pi", new PiSplitterCombiner());        jobCenter.registerForeman(helloForeman);        Warehouse inputWarehouse = new WarehouseDefault();        inputWarehouse.put("start", 1l);        inputWarehouse.put("end", 1000000001l);        Work work = new WorkDefault("pi", inputWarehouse);        Warehouse outputWarehouse = jobCenter.doWork(work);        System.out.println("pi:"+outputWarehouse.get("pi"));        jobCenter.stop();    }}
运行结果:
并行计算运行结果:time:10326ms pi:3.141592694075038单线程计算运行结果time:24857ms pi:3.1415926525880504

这个结果是在本人笔记本跑出来的,笔记本是4核机器,而不是4CPU机器,所以4个并行跑,并没有得到期望的1/4的时间,而是1/2.4左右的时间,因此可以得出两个结论:

结论1:通过并行计算,确实可以缩短计算时间,更好的利用CPU资源。

绪论2:4核和4C还是有显著差异的。

小结

在上面的例子中,我们展示了分布式计算的使用,应该是老小兼宜,简单易懂。

职业介绍所,工人,工头,可以在一台计算机上的,也可以都在一台计算机上。

现在,你可以很牛掰的说,速度慢?哥给你搞个分布式计算不就快了?

下面一讲是,亲们请过去。

转载于:https://my.oschina.net/tinyframework/blog/196070

你可能感兴趣的文章
python中的引用
查看>>
图像旋转公式 旋转中心点
查看>>
Python对文件的操作格式
查看>>
spring 注解要不全写在字段上,要不全写在set方法上,名字还不能写错,省的闹心
查看>>
软件构架实践阅读笔记5
查看>>
CRM项目总结
查看>>
从上往下打印二叉树
查看>>
Material Design(八)--CoordinatorLayout和FloatingActionButton
查看>>
[02]项目实战- 移动端流体布局
查看>>
Ant Design React按需加载
查看>>
[ZHOJ1954]lyd的旅行
查看>>
C++ 智能指针(一)
查看>>
移动端webapp开发必备知识[转]
查看>>
前端之js动画-47
查看>>
CSS中position的absolute和relative的应用
查看>>
树莓派Linux下无线网卡无法获取IP,不识别,等问题
查看>>
双重指针法,分配二维数组内存
查看>>
memcached总结
查看>>
Java基础知识总结二(2)
查看>>
闰年 的方法 和你生存了多少天的 方法
查看>>