当前位置: 代码网 > it编程>前端脚本>Ruby > Ruby3多线程并行Ractor使用方法详解

Ruby3多线程并行Ractor使用方法详解

2024年05月15日 Ruby 我要评论
ruby 3 ractor官方手册:https://github.com/ruby/ruby/blob/master/doc/ractor.md在ruby3之前,使用thread来创建新的线程,但这种

ruby 3 ractor官方手册:https://github.com/ruby/ruby/blob/master/doc/ractor.md

在ruby3之前,使用thread来创建新的线程,但这种方式创建的多线程是并发而非并行的,mri有一个全局解释器锁gil来控制同一时刻只能有一个线程在执行:

# main thread

t1 = thread.new do 
  # new thread
  sleep 3
end
t1.join

ruby3通过ractor(ruby actor,actor模型通过消息传递的方式来修改状态)支持真正的多线程并行,多个ractor之间可并行独立运行。

# main ractor

# 创建一个可与main ractor并行运行的ractor
r = ractor.new do
  sleep 2
  ractor.yield "hello"
end

puts r.take

需注意,每个ractor中至少有一个原生ruby线程,但每个ractor内部都拥有独立的gil,使得ractor内部在同一时刻最多只能有一个线程在运行。从这个角度来看,ractor实际上是解释器线程,每个解释器线程拥有一个全局解释器锁。

如果main ractor退出,则其他ractor也会收到退出信号,就像main thread退出时,其他thread也会退出一样。

创建ractor

使用ractor.new创建一个ractor实例,创建实例时需指定一个语句块,该语句块中的代码会在该ractor中运行。

r = ractor.new do
  puts "new ractor"
end

可在new方法的参数上为该ractor实例指定名称:

r = ractor.new(name: "ractor1") do
  puts "new ractor"
end

puts r.name  # ractor 1

new方法也可指定其他参数,这些参数必须在name参数之前,且这些参数将直接原样传递给语句块参数:

arr = [11, 22, 33]
r = ractor.new(arr, name: "r1") do |arr|
  puts "arr"
end
sleep 1

关于new的参数,稍后还会有解释。

可使用ractor.current获取当前的ractor实例,使用ractor.count获取当前存活的ractor实例数量。

ractor之间传递消息

ractor传递消息的方式分两种:

  • push方式:向某个特定的ractor实例推送消息,可使用r.send(msg)或别名r << msg向该ractor实例传送消息,并在该ractor实例内部使用ractor.receive或别名ractor.recv或它们的同名私有方法来接收推送进来的消息
    • ractor还提供了ractor.receive_if {expr}方法,表示只在expr为true时才接收消息,receive等价于receive_if {true}
  • pull方式:从某个特定的ractor实例拉取消息,可在该ractor实例内部使用ractor.yield向外传送消息,并在需要的地方使用r.take获取传输出来的消息
    • ractor.new的语句块返回值,相当于ractor.yield,它也可被r.take接收

因此,对于push方式,要求知道消息传递的目标ractor,对于pull方式,要求知道消息的来源ractor。

# yield + take
r = ractor.new {ractor.yield "hello"}
puts r.take

# send + receive
r1 = ractor.new do
  # ractor.receive或ractor.recv
  # 或同名私有方法:receive、recv
  puts ractor.receive
end
r1.send("hello")
r1.take    # 本次take取得r1语句块的返回值,即puts的返回值nil

使用new方法创建ractor实例时,可指定new的参数,这些参数会被原样传递给ractor的语句块参数。

arr = [11, 22, 33]
r = ractor.new(arr) { |arr| ...}

实际上,new的参数等价于在ractor语句块的开头使用了ractor.receive接收消息:

r = ractor.new 'ok' { |msg| msg }
r.take #=> 'ok'

# 基本等价于
r = ractor.new do
  msg = ractor.receive
  msg
end
r.send 'ok'
r.take #=> 'ok'

消息端口

ractor之间传递消息时,实际上是通过ractor的消息端口进行传递的。

每个ractor都有自己的incoming port和outgoing port:

  • incoming port:是该ractor接收消息的端口,r.sendractor.receive使用该端口
    • 每个incoming port都连接到一个大小不限的队列上
    • r.send传入的消息都会写入该队列,由于该队列大小不限,因此r.send从不阻塞
    • ractor.receive从该队列弹出消息,当队列为空时,ractor.receive被阻塞直到新消息出现
    • 可使用r.close_incoming关闭incoming port,关闭该端口后,r.send将直接报错,ractor.receive将先从队列中取数据,当队列为空后,再调用ractor.receive将报错
  • outgoing port:是该ractor向外传出消息的端口,ractor.yieldr.take使用该端口
    • ractor.yield或ractor语句块返回时,消息从outgoing port流出
    • 当没有r.take接收消息时,r内部的ractor.yield将被阻塞
    • 当r内部没有ractor.yield时,r.take将被阻塞
    • ractor.yield从outgoing port传出的消息可被任意多个r.take等待,但只有一个r.take可获取到该消息
    • 可使用r.close_outgoing关闭outgoing port,关闭该端口后,再调用r.takeractor.yield将直接报错。如果r.take正被阻塞(等待ractor.yield传出消息),关闭outgoing port操作将取消所有等待中的take并报错

ractor.select等待消息

可使用ractor.select(r1,r2,r3...)等待一个或多个ractor实例outgoing port上的消息(因此,select主要用于等待ractor.yield的消息),等待到第一个消息后立即返回。

ractor.select的返回值格式为[r, obj],其中:

  • r表示等待到的那个ractor实例
  • obj表示接收到的消息对象

例如:

r1 = ractor.new{'r1'}
r2 = ractor.new{'r2'}
rs = [r1, r2]
as = []

# wait for r1 or r2's ractor.yield
r, obj = ractor.select(*rs)
rs.delete(r)
as << obj

# second try (rs only contain not-closed ractors)
r, obj = ractor.select(*rs)
rs.delete(r)
as << obj
as.sort == ['r1', 'r2'] #=> true

通常来说,会使用ractor.select来轮询等待多个ractor实例的消息,通用化的处理流程参考如下:

# 充当管道功能的ractor:接收消息并发送出去,并不断循环
pipe = ractor.new do
  loop do
    ractor.yield ractor.receive
  end
end

rn = 10
# rs变量保存了10个ractor实例
# 每个ractor实例都从管道pipe中取一次消息然后由本ractor发送出去
rs = rn.times.map{|i|
  ractor.new pipe, i do |pipe, i|
    msg = pipe.take
    msg # ping-pong
  end
}
# 向管道中发送10个数据
rn.times{|i| pipe << i}

# 轮询等待10个ractor实例的outgoing port
# 每等待成功一次,从rs中删除所等待到的ractor实例,
# 然后继续等待剩下的ractor实例
rn.times.map{
  r, n = ractor.select(*rs)
  rs.delete r
  n
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

此外,ractor.select除了可等待消息外,也可以用来yield传递消息,更多用法参考官方手册:ractor.select

ractor并行时如何避免竞态

多个ractor之间是可并行运行的,为了避免ractor之间传递数据时出现竞态问题,ractor采取了一些措施:

  • 对于不可变对象,它们可直接在ractor之间共享,此时传递它们的引用
  • 对于可变对象,它们不可直接在ractor之间共享,此时传递数据时,默认先按字节逐字节拷贝,然后后传递副本
  • 也可以显式指定移动数据,将某份数据从ractor1移动到另一个ractor2中,即转移数据的所有权(参考rust的所有权规则),转移所有权后,原始所有者ractor中将无法再访问该数据

传递可共享对象:传递引用

可共享的对象:自动传递它们的引用,效率高

  • 不可变对象可在ractor之间直接共享(如integer、symbol、true/false、nil),如:
    • i=123:i是可共享的
    • s="str".freeze:s是可共享的
    • h={c: object}.freeze:h是可共享的,因为object是一个类对象,类对象是可共享的
    • a=[1,[2],3].freeze:a不可共享,因为冻结后仍然包含可变的[2]
  • class/module对象,即类对象自身和模块对象自身是可共享的
  • ractor对象自身是可共享的

例如:

i = 33
r = ractor.new do
  m = recv
  puts m.object_id
end

r.send(i)  # 传递i
r.take     # 等待ractor执行结束(语句块返回)
puts i.object_id  # i传递后仍然可用
=begin
67
67
=end

值得注意的是,ractor对象是可共享的,因此可将某个ractor实例传递给另一个ractor实例。例如:

pipe = ractor.new do
  loop do
    ractor.yield ractor.receive
  end
end

rn = 10
rs = rn.times.map{|i|
  # pipe是一个ractor实例,这里作为参数传递给其他的ractor实例
  ractor.new pipe, i do |pipe, i|
    pipe << i
  end
}

rn.times.map{
  pipe.take
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

传递不可共享对象:传递副本

绝大多数对象不是可直接共享的。在ractor之间传递不可共享的对象时,默认会传递deep-copy后的副本,即按字节拷贝的方式拷贝该对象的每一个字节。这种方式效率较低。

例如:

arr = [11, 22, 33]  # 数组是可变的,不可共享
r = ractor.new do
  m = recv
  puts "copied: #{m.object_id}"
end

r.send(arr)  # 传递数组,此时将逐字节拷贝数组
r.take
puts "origin: #{arr.object_id}"

=begin
copied: 60
origin: 80
=end

从结果看,两个ractor内的arr不是同一个对象。

需注意,对于全局唯一的对象来说(比如数值、nil、false、true、symbol),逐字节拷贝时并不会拷贝它们。例如:

arr = %i[lang action sub]
r = ractor.new do
  m = recv
  puts "copied: #{m.object_id}, #{m[0].object_id}, #{m[1].object_id}"
end

r.send(arr)
r.take
puts "origin: #{arr.object_id}, #{arr[0].object_id}, #{arr[1].object_id}"

=begin
copied: 60, 80, 1046748
origin: 100, 80, 1046748
=end

注意,thread对象无法拷贝,因此无法在ractor之间传递。

转移数据所有权

还可以让r.send(msg, move: true)ractor.yield(msg, move: true)传递数据时,明确表示要移动而非拷贝数据,即转移数据的所有权(从原来的所有者ractor实例转移到目标ractor实例)。

无论是可共享还是不可共享的对象,都可以转移所有权,只不过转移可共享对象的所有权没有意义,因为转移之后,原所有者仍然拥有所有权。

因此,通常只对不可共享的数据来转移所有权,转移所有权后,原所有者将无法访问该数据。

str = "hello"
puts str.object_id
r = ractor.new do
  m = recv
  puts m.object_id
end

r.send(str, move: true)  # 转移str的所有权
r.take
#puts str.object_id  # 转移所有权后再访问str,将报错

=begin
60
80
=end

值得注意的是,移动的本质是内存拷贝,它底层也一样是逐字节拷贝原始数据的过程,所以移动传递数据的效率和传递副本数据的效率是类似的。移动传递和传递副本的区别之处在于所有权,移动传递后,原所有者ractor实例将无法访问该数据,而拷贝传递方式则允许原所有者访问

注意,thread对象无法转移所有权,因此无法在ractor之间传递。

不可共享变成可共享:ractor.make_shareable

对于不可共享的数据obj,可通过ractor.make_shareable(obj)方法将其转变为可共享的数据,默认转变的方式是逐层次地递归冻结obj。也可指定额外的参数ractor.make_shareable(obj, copy: true),此时将深拷贝obj得其副本,再让副本(逐层递归冻结)转变为可共享数据。

例如:

arr = %w[lang action sub]
puts arr.object_id
r = ractor.new do
  m = recv
  puts m.object_id
end

r.send(ractor.make_shareable(arr))
r.take
puts arr.object_id
puts arr.frozen?

输出:

60
60
60
true

示例

工作者线程池:

require 'prime'

pipe = ractor.new do
  loop do
    ractor.yield ractor.receive
  end
end

n = 1000
rn = 10
workers = (1..rn).map do
  ractor.new pipe do |pipe|
    while n = pipe.take
      ractor.yield [n, n.prime?]
    end
  end
end

(1..n).each{|i|
  pipe << i
}

pp (1..n).map{
  _r, (n, b) = ractor.select(*workers)
  [n, b]
}.sort_by{|(n, b)| n}

pipeline:

# pipeline with yield/take
r1 = ractor.new do
  'r1'
end

r2 = ractor.new r1 do |r1|
  r1.take + 'r2'
end

r3 = ractor.new r2 do |r2|
  r2.take + 'r3'
end

p r3.take #=> 'r1r2r3'
(0)

相关文章:

  • 安装Ruby和安装Rails详细步骤详解

    安装Ruby和安装Rails详细步骤详解

    rbenv安装rubyrbenv可以管理多个版本的ruby。可以分为3种范围(或者说不同生效作用域)的版本:local版:本地,针对各项目范围(只在某个目录下有... [阅读全文]
  • Ruby生成随机数的方法总结

    Ruby生成随机数的方法总结

    数字实际上不是随机的没有一台计算机能纯粹通过计算产生真正的随机数。它们能做的最好的事情就是生成伪随机数,伪随机数是一组看起来随机但实际上不是随机的数字。对于人类... [阅读全文]
  • Ruby 迭代器及文件的输入与输出

    Ruby 迭代器及文件的输入与输出

    前言本章介绍ruby的迭代器和文件的输入与输出。ruby 迭代器简单来说:迭代(iterate)指的是重复做相同的事,所以迭代器(iterator)就是用来重复... [阅读全文]
  • 最新的CocoaPods安装教程

    最新的CocoaPods安装教程

    cocoapods是什么?当你开发ios应用时,会经常使用到很多第三方开源类库,比如jsonkit,afnetworking等等。可能某个类库又用到其他类库,所... [阅读全文]
  • Ruby正则表达式详解

    Ruby正则表达式详解

    ruby 正则表达式正则表达式是一种特殊序列的字符,它通过使用有专门语法的模式来匹配或查找字符串集合。正则表达式用事先定义好的一些特定字符、及这些特定字符的组合... [阅读全文]
  • Rails实现字段加密存储

    Rails实现字段加密存储

    方案存储前,加密后再存储到数据库读取后,利用 key 进行解密实现activesupport::messageencryptor 是 rails 基于 open... [阅读全文]

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com