前言
最近在测试学习 aws s3 sdk 中的 multi part upload 功能,其基本步骤就是 createmultipartupload
后, 串行或并行地 uploadpart
,最后 completemultipartupload
或 abortmultipartupload
收尾。为了最高效率地完成整个传输,中间的 uploadpart 部分使用多个 goroutine 并发地上传是最快地。因此尝试着写了一下,并完美地实现。
扩展
虽然已经完成对应功能的开发和测试,但仔细分析一下,发现有大量的模式代码,比如:
- 创建指定个数的 goroutine, 并使用
sync.waitgroup
管理和同步. - 使用 chan 提供待处理数据,并接受处理结果
- 看起来整个处理流程就是典型的 map-reduce 结构 或者 说是 java stream/parallelstream 中的 map, reduce.
网上搜索一下, 发现很多人也有这个需求,也写了不少库,但实测了一下,发现根本不好用。于是决定自己再造一个“自己觉得比较好的”轮子,因此有了 mapreduce 和本篇文章。
主要功能函数
func map[t any, r any](ctx context.context, inputs []t, mapper mapperfunc[t, r]) resultsmap[t, r]
- 这是最简单的同步 map, 通过泛型的 t 和 r 支持任意类型的数据转换
func parallelmap[t any, r any](ctx context.context, inputs []t, concurrency int, name string, mapper mapperfunc[t, r]) resultsmap[t, r]
- 这是并发的map,内部回启动最多 2+concurrency 个 goroutine, 并发的处理完
inputs
中的所有数据. 并且结果可以按照输入的顺序排序。 func streammap[t any, r any](ctx context.context, concurrency int, name string, startindex int64, chinput chan t, mapper mapperfunc[t, r]) chan *outputitem[t, r]
类似纤程池的形态, 可以无限地处理 chinput
中的数据,并将结果写入 outputitem
.
额外说明
错误处理
作为一个并发处理框架,对于错误情况也应该能很好的支持,有的时候, 一项元素处理失败了不影响整体的流程处理, 但有的时候其中一项失败, 就不需要继续进行(比如 s3 的 multi part upload, 如果其中一部分失败, 那其他的部分再上传也没有意义了)。因此代码中定义了 operationtype
类型, 其值分为continue
或 stop
, 框架只根据这个值确认是否继续处理, 而不是根据 mapper 函数是否返回 error.
结果返回
并发处理时, 每个 item 的处理时长/顺序等是不同的,而且有可能因为错误造成部分输入元素尚未处理即结束,因此返回的结果默认情况下不一定能和输入顺序一一对应,因此采用了 map 的方式保存输入序号 => 结果。
排序
s3 的 multi part upload 在调用 completemultipartupload
时参数 parts
需要是排好序的,因此通过 convertresult
函数对结果进行排序。
测试代码
注意: 并发处理带错误数据的时候, 由于错误项的处理顺序比较随机, 因此我使用了 concurrency: 1
的方式保证 ut 能顺利判断。如果将 concurrency
更改为大于1的情况, 其 want 不一定能满足. 比如: “error with stop” 时, 如果 concurrency
> 1, 结果有可能就不是 [1 2 3 0]
而是 [1 2 3 0 4 5]
了, 这种属于正常现象(自己更改测试一下即可理解 )
func testmap(t *testing.t) { type args struct { ctx context.context inputs []string concurrency int mapper mapperfunc[string, int] } tests := []struct { name string args args want []int wanterrs []error optype operationtype }{ { name: "all successful", args: args{ctx: context.background(), inputs: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}, concurrency: runtime.numcpu(), mapper: convertstopfunc, }, want: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, wanterrs: []error{nil, nil, nil, nil, nil, nil, nil, nil, nil, nil}, optype: continue, }, { name: "error with continue", args: args{ctx: context.background(), inputs: []string{"1", "not", "3"}, concurrency: 1, mapper: convertcontinuefunc}, want: []int{1, 0, 3}, wanterrs: []error{nil, numbererrhelper("not"), nil}, optype: continue, // 出现过错误,但忽略了. 如果采用 continue 的方式来处理错误, 则只能自己遍历 resultsmap 的结果集才知道是否有错误 }, { // 注意: 如果并发度 concurrency > 1, 则结果个数不确定, 但肯定至少有一个错误的 name: "error with stop", args: args{ctx: context.background(), inputs: []string{"1", "2", "3", "not_exist", "4", "5", "6"}, concurrency: 1, mapper: convertstopfunc}, want: []int{1, 2, 3, 0}, wanterrs: []error{nil, nil, nil, numbererrhelper("not_exist")}, optype: stop, }, { name: "error last", // 最后一个数据出错时,其返回的结果数组长度和输入数组的长度相同. 因此不能依靠数组长度来判断是否有问题. args: args{ctx: context.background(), inputs: []string{"1", "2", "3", "not_exist"}, concurrency: 1, mapper: convertstopfunc}, want: []int{1, 2, 3, 0}, wanterrs: []error{nil, nil, nil, numbererrhelper("not_exist")}, optype: stop, }, } for _, tt := range tests { t.run(tt.name, func(t *testing.t) { if true { //使用 map 串行转换 got := map(tt.args.ctx, tt.args.inputs, tt.args.mapper) //flog.infof("map name=%s, got=%+v", tt.name, got) realresult, errs, optype := got.convertresult() du.goassertequal(t, tt.want, realresult, "want") du.goassertequal(t, tt.wanterrs, errs, "wanterrs") du.goassertequal(t, tt.optype, optype, "optype") } if true { //使用 parallelmap 并行转换 got := parallelmap(tt.args.ctx, tt.args.inputs, tt.args.concurrency, tt.name, tt.args.mapper) //flog.infof("parallelmap name=%s, got=%+v", tt.name, got) realresult, errs, optype := got.convertresult() du.goassertequal(t, tt.want, realresult, "want") du.goassertequal(t, tt.wanterrs, errs, "wanterrs") du.goassertequal(t, tt.optype, optype, "optype") } }) } } func teststreammap(t *testing.t) { ctx := context.background() initemcount := 10000 chinput := make(chan string) go func() { for i := 0; i < initemcount; i++ { idx := rand.intn(100) chinput <- fmt.sprintf("%d", idx) } close(chinput) }() //启动 100 个 纤程并行处理 initemcount(10000) 个数据的转换 choutput := streammap(ctx, 100, "teststreammap", 100, chinput, convertcontinuefunc) mapresultcount := 0 for outitem := range choutput { mapresultcount++ flog.debugf("outitem=%v", outitem) } du.goassertequal(t, initemcount, mapresultcount, "initemcount") }
##补充信息
- 因为众所周知的原因, 以后 go-library 的代码将只更新 https://gitee.com/fishjam/go-library, 不再更新 github 上的版本.
- s3 的 multi upload 不需要大家自己写,
manager.newuploader
已经提供了完整的实现, 比大多数人实现得更好。
到此这篇关于go通用的 mapreduce 工具函数的文章就介绍到这了,更多相关go mapreduce 工具函数内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论