当前位置: 代码网 > it编程>编程语言>Java > tp5+workman(GatewayWorker) 安装及使用

tp5+workman(GatewayWorker) 安装及使用

2024年08月06日 Java 我要评论
gateway-worker

一、安装thinkphp5

1、宝塔删除php禁用函数putenv、pcntl_signal_dispatch、pcntl_wai、pcntl_signal、pcntl_alarm、pcntl_fork,执行安装命令。

composer create-project topthink/think=5.0.* tp5  --prefer-dist

2、配置好站点之后,浏览器打开访问成功。

二、tp5安装gatewayworker

1、进入tp5目录,安装gatewayworker

composer require workerman/gateway-worker

如果报错安装指定版本

2、安装workman

composer require workerman/workerman

如果报错安装指定版本

3、安装gatewayclient

composer require workerman/gatewayclient

如果报错安装指定版本

三、使用gatewayworker  

注:我已修改默认端口号,在宝塔开启端口号

1、创建文件   tp5/public/start.php

<?php
/**
 * run with command
 * php start.php start
 */

ini_set('display_errors', 'on');
use workerman\worker;

if(strpos(strtolower(php_os), 'win') === 0)
{
    exit("start.php not support windows, please use start_for_win.bat\n");
}

// 检查扩展
if(!extension_loaded('pcntl'))
{
    exit("please install pcntl extension. see http://doc3.workerman.net/appendices/install-extension.html\n");
}

if(!extension_loaded('posix'))
{
    exit("please install posix extension. see http://doc3.workerman.net/appendices/install-extension.html\n");
}

// 标记是全局启动
define('global_start', 1);

require_once __dir__ . '/../vendor/autoload.php';
// 加载所有applications/*/start.php,以便启动所有服务 application更改为自己文件夹名字,我的为websocket
foreach(glob(__dir__.'/../api/websocket/start*.php') as $start_file)
{
    require_once $start_file;
}
// 运行所有服务
worker::runall();

2、创建文件  tp5/api/socket/events.php  (创建php文件,或者下载demo复制过去即可)

<?php
/**
 * this file is part of workerman.
 *
 * licensed under the mit license
 * for full copyright and license information, please see the mit-license.txt
 * redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php mit license
 */

/**
 * 用于检测业务代码死循环或者长时间阻塞等问题
 * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
 * 然后观察一段时间workerman.log看是否有process_timeout异常
 */
//declare(ticks=1);

use \gatewayworker\lib\gateway;

/**
 * 主逻辑
 * 主要是处理 onconnect onmessage onclose 三个方法
 * onconnect 和 onclose 如果不需要可以不用实现并删除
 */
class events
{
    /**
     * 当客户端连接时触发
     * 如果业务不需此回调可以删除onconnect
     * 
     * @param int $client_id 连接id
     */
    public static function onconnect($client_id)
    {
        echo "【新的客户端链接】:client_id:".$client_id.php_eol;
        // 向当前client_id发送数据 
        gateway::sendtoclient($client_id, "hello $client_id\r\n");
        // 向所有人发送
        gateway::sendtoall("$client_id login\r\n");
    }
    
   /**
    * 当客户端发来消息时触发
    * @param int $client_id 连接id
    * @param mixed $message 具体消息
    */
   public static function onmessage($client_id, $message)
   {
        // 向所有人发送 
        gateway::sendtoall("$client_id said $message\r\n");
   }
   
   /**
    * 当用户断开连接时触发
    * @param int $client_id 连接id
    */
   public static function onclose($client_id)
   {
       // 向所有人发送 
       gateway::sendtoall("$client_id logout\r\n");
   }
}

3、创建文件tp5/application/socket/start_businessworker.php

<?php
/**
 * this file is part of workerman.
 *
 * licensed under the mit license
 * for full copyright and license information, please see the mit-license.txt
 * redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php mit license
 */
use workerman\worker;
use workerman\webserver;
use gatewayworker\gateway;
use gatewayworker\businessworker;
use workerman\autoloader;

// 自动加载类
require_once __dir__ . '/../../vendor/autoload.php';
require_once __dir__ . '/events.php';


// bussinessworker 进程
$worker = new businessworker();
// worker名称
$worker->name = 'yourappbusinessworker';
// bussinessworker进程数量
$worker->count = 4;
// 服务注册地址
$worker->registeraddress = '127.0.0.1:23222';

//这行代码防止出现报错:waring: events::onmessage is not callable
$worker->eventhandler = 'events';



// 如果不是在根目录启动,则运行runall方法
if(!defined('global_start'))
{
    worker::runall();
}

4、创建文件tp5/application/socket/start_gateway.php

<?php
/**
 * this file is part of workerman.
 *
 * licensed under the mit license
 * for full copyright and license information, please see the mit-license.txt
 * redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php mit license
 */
use \workerman\worker;
use \workerman\webserver;
use \gatewayworker\gateway;
use \gatewayworker\businessworker;
use \workerman\autoloader;

// 自动加载类
require_once __dir__ . '/../../vendor/autoload.php';

// gateway 进程,这里使用text协议,可以用telnet测试
$gateway = new gateway("websocket://0.0.0.0:24222");
// gateway名称,status方便查看
$gateway->name = 'moods';
// gateway进程数
$gateway->count = 4;
// 本机ip,分布式部署时使用内网ip
$gateway->lanip = '127.0.0.1';
// 内部通讯起始端口,假如$gateway->count=4,起始端口为4000
// 则一般会使用4000 4001 4002 4003 4个端口作为内部通讯端口
$gateway->startport = 2900;
// 服务注册地址
$gateway->registeraddress = '127.0.0.1:23222';

// 心跳间隔
//$gateway->pinginterval = 1;
// 心跳数据
//$gateway->pingdata = '{"type":"ping"}';

/*
// 当客户端连接上来时,设置连接的onwebsocketconnect,即在websocket握手时的回调
$gateway->onconnect = function($connection)
{
    $connection->onwebsocketconnect = function($connection , $http_header)
    {
        // 可以在这里判断连接来源是否合法,不合法就关掉连接
        // $_server['http_origin']标识来自哪个站点的页面发起的websocket链接
        if($_server['http_origin'] != 'http://kedou.workerman.net')
        {
            $connection->close();
        }
        // onwebsocketconnect 里面$_get $_server是可用的
        // var_dump($_get, $_server);
    };
};
*/

// 如果不是在根目录启动,则运行runall方法
if(!defined('global_start'))
{
    worker::runall();
}

5、创建文件tp5/application/socket/start_register.php

<?php
/**
 * this file is part of workerman.
 *
 * licensed under the mit license
 * for full copyright and license information, please see the mit-license.txt
 * redistributions of files must retain the above copyright notice.
 *
 * @author walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link http://www.workerman.net/
 * @license http://www.opensource.org/licenses/mit-license.php mit license
 */
use \workerman\worker;
use \gatewayworker\register;

// 自动加载类
require_once __dir__ . '/../../vendor/autoload.php';

// register 必须是text协议
$register = new register('text://0.0.0.0:23222');

// 如果不是在根目录启动,则运行runall方法
if(!defined('global_start'))
{
    worker::runall();
}

6、启动websocket程序

1、防火墙打开8282、1236端口,执行下面命令

//运行

php start.php start


//linux运行

php start.php start -d


//停止


php start.php stop


//检测端口是否以被占用
netstat -an | grep 80

//关闭某个进程
sudo kill -9 进程id

如果修改文件后一定要先停止在运行一下文件,否则不生效

四、使用gatewayworker发布广播

1、创建文件tp5/application/index/controller/index.php,执行这个方法就可以向所有人发布广播了。

<?php
namespace app\index\controller;
use gatewayclient\gateway;
class index
{
    public function index()
    {
        gateway::sendtoall(" index发的消息 \r\n");
    }
}

后面逻辑,自己处理即可

测试发信息内容为

 测试地址: http://www.jsons.cn/websocket/

用户1

用户2

下面是我写的一个例子

数据库:

set names utf8mb4;
set foreign_key_checks = 0;

-- ----------------------------
-- table structure for cmf_chat
-- ----------------------------
drop table if exists `cmf_chat`;
create table `cmf_chat`  (
  `id` int(11) not null auto_increment,
  `user_ids` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null comment '用于查找聊天记录',
  `operation_type` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null comment '操作类型:send_message发送信息,login登录',
  `send_uid` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null comment '发送人',
  `send_client_id` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null,
  `to_uid` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null comment '收信息人',
  `to_client_id` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null,
  `openid` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null comment 'openid',
  `content_type` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null comment '发送信息类型:text文本',
  `content` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null comment '发送信息',
  `signature` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null comment '唯一签名',
  `date` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null comment '日期',
  `time` bigint(20) null default null comment '时间',
  `create_time` bigint(20) null default null comment '创建时间',
  `update_time` bigint(20) null default null comment '更新时间',
  `delete_time` bigint(20) null default 0 comment '软删除',
  `me_client_id` varchar(255) character set utf8mb4 collate utf8mb4_general_ci null default null,
  primary key (`id`) using btree
) engine = innodb auto_increment = 100 character set = utf8mb4 collate = utf8mb4_general_ci comment = '存入聊天记录' row_format = dynamic;

-- ----------------------------
-- records of cmf_chat
-- ----------------------------

set foreign_key_checks = 1;

1.安装mysql,插架

使用workerman/mysql 扩展

composer require workerman/mysql

2.处理逻辑   events.php

<?php
/**
 * this file is part of workerman.
 *
 * licensed under the mit license
 * for full copyright and license information, please see the mit-license.txt
 * redistributions of files must retain the above copyright notice.
 *
 * @author    walkor<walkor@workerman.net>
 * @copyright walkor<walkor@workerman.net>
 * @link      http://www.workerman.net/
 * @license   http://www.opensource.org/licenses/mit-license.php mit license
 */

/**
 * 用于检测业务代码死循环或者长时间阻塞等问题
 * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
 * 然后观察一段时间workerman.log看是否有process_timeout异常
 */

//declare(ticks=1);

use \gatewayworker\lib\gateway;


/**
 * 主逻辑
 * 主要是处理 onconnect onmessage onclose 三个方法
 * onconnect 和 onclose 如果不需要可以不用实现并删除
 */


error_reporting(0);


class events
{
    /**
     * 新建一个类的静态成员,用来保存数据库实例
     */
    public static $db = null;

    /**
     * 进程启动后初始化数据库连接 两者都可以
     */
    public static function onworkerstart($worker)
    {
        //使用gateway_worker扩展
        self::$db = new \gatewayworker\lib\dbconnection('47.****.188', '3306', 'kf***od_com', 'gyt***4jb65cd', 'kf***d_com');

        //使用workerman/mysql 扩展
        //        self::$db = new \workerman\mysql\connection('host', 'port', 'user', 'password', 'db_name');
    }

    /**
     * 当客户端连接时触发
     * 如果业务不需此回调可以删除onconnect
     *
     * @param int $client_id 连接id
     */
    public static function onconnect($client_id)
    {
        // 向当前client_id发送数据
        $restult = [
            'operation_type' => 'login',
            'me_client_id'   => $client_id,
            'content_type'   => "text",
            'content'        => "$client_id login success",
            'signature'      => cmf_random_string(100),
            'date'           => date('y-m-d h:i:s'),
            'time'           => time(),
            'create_time'    => time(),
        ];

        //存入数据库
        self::$db->insert('cmf_chat')->cols($restult)->query();

        //转换为json格式
        $send_message = json_encode($restult, json_unescaped_slashes | json_unescaped_unicode);


        //给自己发送注册成功
        gateway::sendtoclient($client_id, $send_message);


        // 向所有人发送
        //gateway::sendtoall("$client_id login\r\n");
    }

    /**
     * 当客户端发来消息时触发
     * @param int   $client_id 连接id
     * @param mixed $message   具体消息
     */
    public static function onmessage($client_id, $message)
    {
        //写个日志文件
        $contents = json_encode($message, json_unescaped_slashes | json_unescaped_unicode);
        //写入日志
        $filename = '../api/websocket/log/';
        !is_dir($filename) && mkdir($filename, 0755, true);
        $file_hwnd = fopen($filename . date('y-m-d') . ".log", "a+");
        fwrite($file_hwnd, "$client_id----$contents" . "\r\n");
        fclose($file_hwnd);


        //数据格式转为数组
        $message = json_decode($message, true);


        // 向指定人发送
        $restult = [
            'operation_type' => 'send_message',
            'send_client_id' => isset($message['send_client_id']) ? $message['send_client_id'] : '',//用户client_id
            'send_uid'       => isset($message['send_uid']) ? $message['send_uid'] : '',//或者用户uid
            'to_client_id'   => isset($message['to_client_id']) ? $message['to_client_id'] : '',
            'to_uid'         => isset($message['to_uid']) ? $message['to_uid'] : '',
            'openid'         => isset($message['openid']) ? $message['openid'] : '',
            'content_type'   => "text",
            'content'        => $message['content'],
            'signature'      => cmf_random_string(100),
            'date'           => date('y-m-d h:i:s'),
            'time'           => time(),
            'create_time'    => time(),
        ];

        //存入数据库
        self::$db->insert('cmf_chat')->cols($restult)->query();


        //转换为json格式
        $send_message = json_encode($restult, json_unescaped_slashes | json_unescaped_unicode);


        //给指定人发信息
        if ($restult['to_client_id']) gateway::sendtoclient($restult['to_client_id'], $send_message);
        if ($restult['to_uid']) gateway::sendtouid($restult['to_uid'], $send_message);


        //在给自己信息同步一下
        if ($restult['send_client_id']) gateway::sendtoclient($restult['send_client_id'], $send_message);
        if ($restult['send_uid']) gateway::sendtouid($restult['send_uid'], $send_message);


        //向所有人发送
        //gateway::sendtoall("$send_message content\r\n");
    }

    /**
     * 当用户断开连接时触发
     * @param int $client_id 连接id
     */
    public static function onclose($client_id)
    {
        // 向所有人发送
        //gateway::sendtoall("$client_id logout\r\n");
    }


    /**
     * 发送请求,将数据存入数据库中
     * @param $url
     * @param $data
     */
    public function add_chat($url, $data)
    {
        $ch = curl_init($url);
        curl_setopt($ch, curlopt_post, 1);
        curl_setopt($ch, curlopt_postfields, $data);
        curl_setopt($ch, curlopt_returntransfer, true);

        $response = curl_exec($ch);
        //        if ($response === false) {
        //            echo 'curl error: ' . curl_error($ch);
        //        } else {
        //            echo 'response: ' . $response;
        //        }

        curl_close($ch);
    }

}

3.前端例子

<!doctype html>
<html>
	<head>
		<title>websocket example</title>
		<script>
			// 创建websocket连接  
			var socket = new websocket("ws://47.94.223.188:24222");

			// 连接打开时触发的事件处理函数  
			socket.onopen = function(event) {
				console.log("连接已建立");
			};

			// 发送消息到服务器  
			function sendmessage() {
				var messageinput = document.getelementbyid("message");
				var message = messageinput.value;


				//获取自己的client_id
				var me_client_id = document.getelementbyid("me_client_id").value;


				//发送信息 
				var send_message = {
					'operation_type': 'send_message',
					'send_client_id': me_client_id,
					//'send_uid': me_client_id,
					'to_uid': 'w001', 
					'openid': 1,
					'content_type': 'text',
					'content': message,
				}
				const jsonstring = json.stringify(send_message);
	  
			 



				socket.send(jsonstring);
				// messageinput.value = "";
			}

			// 接收到消息时触发的事件处理函数  
			socket.onmessage = function(event) {
				var messagecontainer = document.getelementbyid("message-container");
				var messageelement = document.createelement("p");

 
				//返回数据&处理成数组格式
				var result = event.data;
				var jsonobject = json.parse(result);
				//如果类型为 operation_type==login 存一下me_client_id
				if (jsonobject['operation_type'] == 'login') {
					var me_client_id = document.getelementbyid("me_client_id");
					me_client_id.value = jsonobject.me_client_id;
				}


				messageelement.textcontent = event.data;
				messagecontainer.appendchild(messageelement);
			};

			// 连接关闭时触发的事件处理函数  
			socket.onclose = function(event) {
				console.log("连接已关闭");
			};

			// 连接发生错误时触发的事件处理函数  
			socket.onerror = function(error) {
				console.error("websocket 错误: " + error);
			};
		</script>
	</head>
	<body>
		<h1>websocket example</h1>
		<input type="text" id="me_client_id">
		<input type="text" id="message">
		<button onclick="sendmessage()">发送</button>
		<div id="message-container"></div>
	</body>
</html>

4.拿到client_id 绑定uid

    /**
     * 获取所有在线人数
     * @return array
     * https://kf1***om/api/wxapp/send/get_all_uid_list
     */
    public function get_all_uid_list()
    {
        $restult = gateway::getalluidlist();
        dump($restult);
        exit();
    }

    /**
     * client_id与uid绑定
     * 传入自己的client_id和openid,自动绑定为w+用户id  例如w1,w2,w100005
     * @return array
     * https://kf****om/api/wxapp/send/bind_uid
     */
    public function bind_uid()
    {
        $client_id = '7f0000010b5400000004';
        $uid       = 'w002';
        gateway::binduid($client_id, $uid);
        dump(cmf_random_string(100, 3));
        exit();
    }

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com