先演示一下效果,再展示代码逻辑。


中间几次调用过程省略。。。
暂时只用到了下面四个项目

1.产品展示页面中第一次通过接口去获取数据库的列表数据
/// <summary>
/// 获取指定的商品目录
/// </summary>
/// <param name="pagesize"></param>
/// <param name="pageindex"></param>
/// <param name="ids"></param>
/// <returns></returns>
[httpget]
[route("items")]
[producesresponsetype(typeof(paginatedviewmodel<catalog>), statuscodes.status200ok)]
[producesresponsetype(typeof(ienumerable<productdto>), statuscodes.status200ok)]
[producesresponsetype(statuscodes.status400badrequest)]
public async task<iactionresult> catalogs([fromquery] int pagesize = 10, [fromquery] int pageindex = 0, string ids = null)
{
if (!string.isnullorempty(ids))
{
var items = await getitembyids(ids);
if (!items.any())
{
return badrequest("ids value invalid. must be comma-separated list of numbers");
}
return ok(items);
}
var totalitems = await _catalogcontext.catalogs
.longcountasync();
var itemsonpage = await _catalogcontext.catalogs
.orderby(c => c.name)
.skip(pagesize * pageindex)
.take(pagesize)
.tolistasync();
var result = itemsonpage.select(x => new productdto(x.id.tostring(), x.name, x.price.tostring(), x.stock.tostring(), x.imgpath));
var model = new paginatedviewmodel<productdto>(pageindex, pagesize, totalitems, result);
return ok(model);
}2.在前端页面会把当前页面的产品列表id都发送到websocket中去
function updateandsendproductids(ids) {
productids = ids;
// check if the websocket is open
if (socket.readystate === websocket.open) {
// send the list of product ids through the websocket connection
socket.send(json.stringify(productids));
}
}
function fetchdata() {
const apiurl = baseurl + `/catalog/items?pagesize=${pagesize}&pageindex=${currentpage}`;
axios.get(apiurl)
.then(response => {
const data = response.data.data;
displayproducts(baseurl, data);
const newproductids = data.map(product => product.id);
// check if the websocket is open
updateandsendproductids(newproductids);
// 从响应中获取总页数
const totalpages = math.ceil(response.data.count / pagesize);
displaypagination(totalpages);
// 更新当前页数的显示
const currentpageelement = document.getelementbyid('currentpage');
currentpageelement.textcontent = `当前页数: ${currentpage + 1} / 总页数: ${totalpages}`;
})
.catch(error => {
console.error('获取数据失败:', error);
});
}3.websocket拿到了id数据可以精确的把当前页面的产品都查出来再推送给product.html页面,通过下面的receiveasync方法获取html发送的数据,再通过timer定时器每秒钟send方法实时的往页面发送获取到的数据,当然这个是不断的去从redis中去查的。
using system.net.websockets;
using system.threading.tasks;
using system;
using wsserver.handler;
using wsserver.manager;
using stackexchange.redis;
using microsoft.extensions.configuration;
using system.collections.generic;
using catalogs.domain.catalogs;
using catalogs.domain.dtos;
using system.net.sockets;
namespace webscoket.server.services
{
/// <summary>
/// 实时推送产品主要是最新的库存,其他信息也会更新
/// </summary>
public class productlisthandler : websockethandler
{
private system.threading.timer _timer;
private readonly idatabase _redisdb;
//展示列表推送
private string productidsstr;
public productlisthandler(websocketconnectionmanager websocketconnectionmanager,iconfiguration configuration) : base(websocketconnectionmanager)
{
connectionmultiplexer redis = connectionmultiplexer.connect(configuration["distributedredis:connectionstring"] ?? throw new exception("$未能获取distributedredis连接字符串"));
_redisdb = redis.getdatabase();
_timer = new system.threading.timer(send, null, timespan.zero, timespan.fromseconds(1));
}
private void send(object state)
{
// 获取当前时间并发送给所有连接的客户端
if (productidsstr != null)
{
string[] productids = system.text.json.jsonserializer.deserialize<string[]>(productidsstr);
string hashkeytoretrieve = "products";
list<productdto> products = new list<productdto>();
foreach (var productid in productids)
{
if(productid == "null") {
continue;
}
string retrievedproductvalue = _redisdb.hashget(hashkeytoretrieve, productid);
if (!string.isnullorempty(retrievedproductvalue))
{
//反序列化和构造函数冲突,改造了一下catalog
catalog catalog = system.text.json.jsonserializer.deserialize<catalog>(retrievedproductvalue);
products.add(new productdto(catalog.id.tostring(), catalog.name, catalog.price.tostring(), catalog.stock.tostring(), catalog.imgpath));
}
}
if (products.count > 0)
{
sendmessagetoallasync(system.text.json.jsonserializer.serialize(products)).wait();
}
else
{
sendmessagetoallasync("noproduct").wait();
}
}
}
public override async task receiveasync(websocket socket, websocketreceiveresult result, byte[] buffer)
{
//每次页面有刷新就会拿到展示的id列表
productidsstr = system.text.encoding.utf8.getstring(buffer, 0, result.count);
}
}
}4.html页面就可以拿到最新数据再去绑定到页面
socket.addeventlistener('message', (event) => {
if (event.data == "noproduct") {
clearproductlist();
}
// handle the received product data and update the product list
const productdata = json.parse(event.data);
// update the product list with the received data (call your displayproducts function)
displayproducts(baseurl, productdata);
});整个流程就这么简单,但是这里需要保持数据库和redis的数据实时同步,否则页面展示的就不是最新的数据就没意义了。
再回到catalog.service服务中。
private async task deletecache()
{
//await _redisdb.hashdeleteasync("products",id); //没必要了
await _channel.writer.writeasync("delete_catalog_fromredis");
}再做更新、新增、删除等动作的时候就调用一下deletecache方法,往后台服务发送一个channel,当后台收到后就做redis删除并且从初始化sqlserver到redis列表同步的操作
using system.reflection;
using system.threading.channels;
using catalogs.infrastructure.database;
using microsoft.entityframeworkcore;
using microsoft.extensions.hosting;
using microsoft.extensions.logging;
using stackexchange.redis;
namespace catalogs.webapi.backgroudservices
{
/// <summary>
/// 记得任何删除了或者购买了产品后需要删除改产品的键
/// </summary>
public class initproductlisttoredisservice : backgroundservice
{
private readonly iservicescopefactory _servicescopefactory;
private readonly idatabase _redisdb;
private readonly channel<string> _channel;
private readonly ilogger _logger;
public initproductlisttoredisservice(iservicescopefactory servicescopefactory, iconfiguration configuration, channel<string> channel, ilogger<initproductlisttoredisservice> logger)
{
_servicescopefactory = servicescopefactory;
connectionmultiplexer redis = connectionmultiplexer.connect(configuration["distributedredis:connectionstring"] ?? throw new exception("$未能获取distributedredis连接字符串"));
_redisdb = redis.getdatabase();
_channel = channel;
_logger = logger;
}
protected override async task executeasync(cancellationtoken stoppingtoken)
{
await init();
while (!_channel.reader.completion.iscompleted)
{
var msg = await _channel.reader.readasync();
if(msg == "delete_catalog_fromredis")
{
await init();
}
}
}
private async task init()
{
using var scope = _servicescopefactory.createscope();
try
{
catalogcontext _context = scope.serviceprovider.getrequiredservice<catalogcontext>();
string hashkey = "products";
var products = await _context.catalogs.tolistasync();
await _redisdb.keydeleteasync(hashkey);
foreach (var product in products)
{
string productfield = product.id.tostring();
string productvalue = system.text.json.jsonserializer.serialize(product);
_redisdb.hashset(hashkey, new hashentry[] { new hashentry(productfield, productvalue) });
}
_logger.loginformation($"productlist is over stored in redis hash.");
}
catch(exception ex)
{
_logger.logerror($"productlis stored in redis hash error.");
}
}
}
}这里还有优化的空间可以只针对怕products的hashset的某个id去更新、删除、新增一条数据。
示例代码:
liuzhixin405/efcore-template (github.com)
到此这篇关于aspnetcore使用websocket实时更新商品信息的文章就介绍到这了,更多相关aspnetcore实时更新商品信息内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论