1.安装环境
pip install pymongo locust
2.设置测试环境
开启mongodb服务

打开navicat,新建mongodb连接

新建test数据库和sample集合
3.编写脚本
load_mongo.py
# coding=utf-8
from locust import user, task, between, events
from pymongo import mongoclient
import random
import string
import logging
# 设置日志
logging.basicconfig(level=logging.info)
logger = logging.getlogger("mongodb load test")
# mongodb连接配置
mongo_uri = "mongodb://admin:admin@localhost:27017/?authsource=admin"
database_name = "test"
collection_name = "sample"
# 自定义mongodb客户端
class mongodbclient:
def __init__(self):
try:
self.client = mongoclient(mongo_uri)
self.db = self.client[database_name]
self.collection = self.db[collection_name]
logger.info("mongodb client initialized successfully.")
except exception as e:
logger.error(f"failed to connect to mongodb: {e}")
self.client = none
def insert_document(self, document):
if self.collection is not none:
self.collection.insert_one(document)
logger.info("document inserted successfully.")
else:
logger.error("mongodb collection is not available.")
def close_connection(self):
if self.client is not none:
self.client.close()
logger.info("mongodb client connection closed.")
# locust用户类
class mongodbuser(user):
wait_time = between(1, 2)
mongo_client = none
@staticmethod
def initialize_mongo_client():
if mongodbuser.mongo_client is none:
logger.info("initializing mongodb client...")
mongodbuser.mongo_client = mongodbclient()
@staticmethod
def cleanup_mongo_client():
if mongodbuser.mongo_client:
mongodbuser.mongo_client.close_connection()
mongodbuser.mongo_client = none
logger.info("mongodb client cleaned up.")
# 监听测试开始
@staticmethod
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
mongodbuser.initialize_mongo_client()
# 监听测试结束
@staticmethod
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
mongodbuser.cleanup_mongo_client()
@task(2)
def insert_data(self):
"""模拟插入数据的任务"""
if mongodbuser.mongo_client is none:
logger.error("mongodb client is not initialized.")
return
document = {
"name": ''.join(random.choices(string.ascii_letters, k=10)),
"age": random.randint(18, 65),
"address": ''.join(random.choices(string.ascii_letters + string.digits, k=20)),
}
mongodbuser.mongo_client.insert_document(document)
logger.info(f"inserted document: {document}")4.运行测试
打开终端执行命令
locust -f load_mongo.py --headless -u 5 -r 1 -t 5s
测试结果

打开navicat查看sample表可以看到数据增多

以上就是使用locust对mongodb进行负载测试的操作步骤的详细内容,更多关于locust对mongodb负载测试的资料请关注代码网其它相关文章!
发表评论