什么是blockingcollection<t>
blockingcollection<t> 是一个线程安全的集合,它提供了一种机制,允许一个或多个生产者线程将数据添加到集合中,同时允许一个或多个消费者线程从集合中取出数据。它内部封装了一个线程安全的集合(如 concurrentqueue<t>、concurrentstack<t> 或 concurrentbag<t>),并提供了阻塞和限制集合大小的功能。
主要特点
- 线程安全:内部使用锁或其他同步机制,确保在多线程环境下对集合的操作是安全的。
- 阻塞操作:当集合为空时,消费者线程会阻塞等待,直到有数据可用;当集合达到最大容量时,生产者线程会阻塞等待,直到有空间可用。
- 限制大小:可以通过构造函数指定集合的最大容量。
- 支持多种底层集合:可以使用
concurrentqueue<t>(默认)、concurrentstack<t>或concurrentbag<t>作为底层存储结构。
构造函数
blockingcollection<t> 提供了多种构造方式:
// 使用默认的 concurrentqueue<t>,无容量限制 var blockingcollection = new blockingcollection<int>(); // 使用默认的 concurrentqueue<t>,并指定最大容量 var blockingcollection = new blockingcollection<int>(10); // 指定底层集合类型 var blockingcollection = new blockingcollection<int>(new concurrentstack<int>());
常用方法
生产者操作
add(t item):将一个元素添加到集合中。如果集合已满,会抛出异常。tryadd(t item):尝试将一个元素添加到集合中。如果集合已满,返回false。tryadd(t item, timespan timeout):尝试在指定的超时时间内将元素添加到集合中。completeadding():标记集合不再添加新的元素。消费者线程在集合为空时会收到通知并退出。
消费者操作
take():从集合中取出一个元素。如果集合为空,线程会阻塞等待。trytake(out t item):尝试从集合中取出一个元素。如果集合为空,返回false。trytake(out t item, timespan timeout):尝试在指定的超时时间内从集合中取出一个元素。getconsumingenumerable():返回一个可枚举的集合,消费者可以使用foreach遍历集合中的元素。当调用completeadding()后,枚举会结束。
示例代码
以下是一个简单的生产者-消费者示例,使用 blockingcollection<t> 实现:
using system;
using system.collections.concurrent;
using system.threading;
class program
{
static void main()
{
// 创建一个容量为 5 的 blockingcollection
var blockingcollection = new blockingcollection<int>(5);
// 启动生产者线程
thread producerthread = new thread(() =>
{
for (int i = 1; i <= 10; i++)
{
blockingcollection.add(i); // 添加元素
console.writeline($"producer added: {i}");
thread.sleep(500); // 模拟生产时间
}
blockingcollection.completeadding(); // 标记不再添加元素
});
// 启动消费者线程
thread consumerthread = new thread(() =>
{
foreach (var item in blockingcollection.getconsumingenumerable())
{
console.writeline($"consumer consumed: {item}");
thread.sleep(1000); // 模拟消费时间
}
});
producerthread.start();
consumerthread.start();
producerthread.join();
consumerthread.join();
}
}
输出示例
producer added: 1
producer added: 2
consumer consumed: 1
producer added: 3
consumer consumed: 2
producer added: 4
consumer consumed: 3
producer added: 5
consumer consumed: 4
producer added: 6
consumer consumed: 5
producer added: 7
consumer consumed: 6
producer added: 8
consumer consumed: 7
producer added: 9
consumer consumed: 8
producer added: 10
consumer consumed: 9
consumer consumed: 10
注意事项
- 线程安全:
blockingcollection<t>是线程安全的,但需要确保对集合的操作不会与其他非线程安全的操作混用。 - 容量限制:如果集合满了,生产者线程会阻塞,因此需要合理设置容量。
- 异常处理:在生产者调用
add()或消费者调用take()时,可能会抛出异常(如集合已满或已标记为完成添加)。建议使用tryadd()和trytake()方法来避免异常。blockingcollection<t>是c#中实现线程安全的生产者-消费者模式的利器,它简化了线程同步的复杂性,非常适合多线程编程场景。
串口接收
在使用 blockingcollection<t> 存储串口接收的数据,并在其他线程中取出时,是否能保证数据的顺序,主要取决于以下两个因素:
底层存储的类型
blockingcollection<t> 允许指定底层存储的类型。默认情况下,它使用 concurrentqueue<t> 作为底层存储,而 concurrentqueue<t> 是一个先进先出 fifo的队列。这意味着数据的添加顺序和取出顺序是一致的,因此可以保证顺序。
如果你使用其他类型的底层存储(如 concurrentstack<t> 或自定义的线程安全集合),则顺序可能会有所不同。例如:
concurrentqueue<t>:保证 fifo 顺序。concurrentstack<t>:保证 lifo(后进先出)顺序。
线程安全和并发访问
blockingcollection<t> 是线程安全的,因此即使在多线程环境下,数据的添加和取出操作也是安全的。只要底层存储是 fifo 的(如 concurrentqueue<t>),数据的顺序就能得到保证。
串口数据接收的顺序性
串口通信本身是按字节顺序接收数据的,因此只要数据是逐字节接收并立即添加到 blockingcollection<t> 中,数据的顺序就能得到保证。
示例代码
以下是一个示例,展示如何使用 blockingcollection<t> 存储串口接收的数据,并在其他线程中按顺序取出:
using system;
using system.collections.concurrent;
using system.io.ports;
using system.threading;
class serialportexample
{
private serialport _serialport;
private blockingcollection<string> _dataqueue = new blockingcollection<string>();
public serialportexample(string portname)
{
_serialport = new serialport(portname)
{
baudrate = 9600,
databits = 8,
parity = parity.none,
stopbits = stopbits.one,
readtimeout = 500
};
_serialport.datareceived += serialport_datareceived;
}
private void serialport_datareceived(object sender, serialdatareceivedeventargs e)
{
try
{
string data = _serialport.readline(); // 假设数据以换行符分隔
_dataqueue.add(data); // 将数据添加到阻塞集合
console.writeline($"received and added: {data}");
}
catch (exception ex)
{
console.writeline($"error in datareceived: {ex.message}");
}
}
public void start()
{
_serialport.open();
thread consumerthread = new thread(consumedata);
consumerthread.start();
}
private void consumedata()
{
foreach (var data in _dataqueue.getconsumingenumerable())
{
console.writeline($"consumed: {data}");
// 处理数据
}
}
public void stop()
{
_dataqueue.completeadding();
_serialport.close();
}
static void main()
{
serialportexample example = new serialportexample("com3");
example.start();
console.writeline("press enter to exit...");
console.readline();
example.stop();
}
}
关键点
- 底层存储:使用
concurrentqueue<t>(默认)可以保证数据的 fifo 顺序。 - 线程安全:
blockingcollection<t>是线程安全的,因此在多线程环境下不会出现数据顺序混乱的问题。 - 串口数据接收:只要串口接收的数据是按顺序添加到
blockingcollection<t>中的,顺序就能得到保证。
因此,只要使用默认的 concurrentqueue<t> 作为底层存储,并且正确处理串口数据的接收和添加,blockingcollection<t> 是可以保证数据顺序的。
到此这篇关于c# blockingcollection的使用小结的文章就介绍到这了,更多相关c# blockingcollection内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论