契机
使用nginx作为负载均衡(load balancing)的时候,发现真实ip无法获取。几经折腾终于拿到真实ip,又发现被代理的端口又无法使用非代理模式连接,由于之前暴露的docker端口有限,从中部分取巧终于拨开云雾见光明,再结合java客户端获取真实ip一气呵成。
emqx配置
#docker部署 #18083为管理页面端口,1883为默认mqtt端口,8883为默认mqtts端口 #如果还没有新建emqx容器,建议多暴露端口,具体原因见下文 docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.7.2
- 进入emqx管理页面,新建proxy_tcp监听器,类型为tcp,端口为8883
- 然后关闭之前ssl-8883监听
- 借用端口是因为docker暴露到宿主机的端口不够了,一旦设置比如proxy_tcp监听器的8883代理监听后,客户端就只能通过nginx代理连接这个8883端口了,直接连接8883就不行了

#获取emqx的容器id,假如为1111
docker ps | grep emqx
#将配置文件拷贝出来
docker cp 1111:/opt/emqx/etc/emqx.conf .
#修改,添加下面项目
vim emqx.conf
#打开proxy_tcp的代理监听
listeners.tcp.proxy_tcp {
proxy_protocol = true
}
#关闭default的代理,默认是关闭的
#但是一旦打开过,就要手动设置为false
listeners.tcp.default {
proxy_protocol = false
}
#放置回去
docker cp ./emqx.conf 1111:/opt/emqx/etc/
#重启容器
docker restart 1111
此时
- 直接使用客户端设备连接8883端口失败,无论mqtt/mqtts协议
- 1883端口可以使用mqtt协议可以正常连接
- emqx不存放证书,ssl认证是一个耗时的操作,丢到nginx去搞
nginx配置
#nginx安装 - 略
#nginx安装模块
sudo yum install nginx-mod-stream
#修改nginx配置文件
vim /etc/nginx/nginx.conf
#配置文件如下
#192.168.0.1为emqx所在服务器,与nginx不在一个服务器
#监听1883为mqtt
#监听8883为mqtts
stream {
upstream stream_backend {
zone tcp_servers 64k;
hash $remote_addr;
server 192.168.0.1:1883 max_fails=2 fail_timeout=30s;
}
server {
listen 1883;
proxy_pass stream_backend;
proxy_buffer_size 4k;
}
upstream stream_backend_ssl {
zone tcp_servers 64k;
hash $remote_addr;
server 192.168.0.1:8883 max_fails=2 fail_timeout=30s;
}
server {
listen 8883 ssl;
proxy_protocol on;
proxy_pass stream_backend_ssl;
proxy_buffer_size 4k;
ssl_handshake_timeout 15s;
ssl_certificate /etc/nginx/cert/_.xx.pem;
ssl_certificate_key /etc/nginx/cert/_.xx.key;
}
}
#重启nginx
sudo systemctl restart nginx
此时
- 8883为mqtts端口,链接mqtts,emqx控制台可以看到真实ip
- 1883为mqtt端口,链接mqtt,emqx控制台看不到真实ip
- 理论上业务只暴露mqtts端口到外网,mqtt为内部调试方便抓包用的

脚本获取
设置api密钥,保存下username和pasword

#!/bin/bash # check if both host and password arguments are provided if [ $# -lt 2 ]; then echo "usage: $0 <host> <password>" exit 1 fi # define host, username, and password host="$1" username="username" password="$2" # encode username and password in base64 auth=$(echo -n "$username:$password" | base64) # create the curl command curl -h "authorization: basic $auth" "http://$host:18083/api/v5/clients?like_username=server"
#运行 chmod +x ./emqx_curl.sh ./emqx_curl.sh localhost your_password_here
java获取
service
package com.bothsavage.common.mqtt.service;
import com.alibaba.fastjson.jsonobject;
import com.bothsavage.common.mqtt.config.interceptor.emqxauthinterceptor;
import com.bothsavage.common.mqtt.entity.dto.clientinfodto;
import com.bothsavage.common.mqtt.entity.dto.emqxrespdto;
import feign.headers;
import org.springframework.cloud.openfeign.feignclient;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.pathvariable;
import org.springframework.web.bind.annotation.requestparam;
/**
* emqx请求
*
* @author bothsavage
*/
@feignclient(name = "emqxrmiservice", url = "${emqx.rmi.url:http://127.0.0.1:18083/}",configuration = emqxauthinterceptor.class)
public interface emqxrmiservice {
/**
* 获取客户端信息
*
* @param clientid 客户端id
* @return -
*/
@getmapping(value = "/api/v5/clients/{id}")
@headers(value = {"content-type=application/json;charset=utf-8", "accept=application/json"})
clientinfodto getclientbyid(@pathvariable("id") string clientid);
/**
* 查询客户端信息
*
* @param likeusername 客户端名称
* @return -
*/
@getmapping(value = "/api/v5/clients")
@headers(value = {"content-type=application/json;charset=utf-8", "accept=application/json"})
emqxrespdto<clientinfodto> getclientsbyusername(@requestparam("like_username") string likeusername);
}
auth
package com.bothsavage.common.mqtt.config.interceptor;
import feign.requestinterceptor;
import feign.requesttemplate;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.configuration;
import java.util.base64;
/**
* emqx请求权限拦截
*
* @author bothsavage
*/
@configuration
public class emqxauthinterceptor implements requestinterceptor {
@value("${emqx.rmi.username:x}")
private string username;
@value("${emqx.rmi.password:x}")
private string password;
@override
public void apply(requesttemplate template) {
string auth = username + ":" + password;
string encodedauth = base64.getencoder().encodetostring(auth.getbytes());
template.header("authorization", "basic " + encodedauth);
}
}
dto
package com.bothsavage.common.mqtt.entity.dto;
import com.alibaba.fastjson.annotation.jsonfield;
import lombok.data;
/**
* dto
*
* @author bothsavage
*/
@data
public class clientinfodto {
@jsonfield(name = "clientid")
private string clientid;
@jsonfield(name = "mqueue_len")
private integer mqueuelen;
@jsonfield(name = "reductions")
private integer reductions;
@jsonfield(name = "keepalive")
private integer keepalive;
@jsonfield(name = "listener")
private string listener;
@jsonfield(name = "proto_ver")
private integer protover;
@jsonfield(name = "recv_msg.dropped.await_pubrel_timeout")
private integer recvmsgdroppedawaitpubreltimeout;
@jsonfield(name = "send_msg.dropped.expired")
private integer sendmsgdroppedexpired;
@jsonfield(name = "mountpoint")
private object mountpoint;
@jsonfield(name = "mailbox_len")
private integer mailboxlen;
@jsonfield(name = "send_msg")
private integer sendmsg;
@jsonfield(name = "zone")
private string zone;
@jsonfield(name = "subscriptions_cnt")
private integer subscriptionscnt;
@jsonfield(name = "heap_size")
private integer heapsize;
@jsonfield(name = "recv_msg")
private integer recvmsg;
@jsonfield(name = "recv_cnt")
private integer recvcnt;
@jsonfield(name = "send_msg.dropped.too_large")
private integer sendmsgdroppedtoolarge;
@jsonfield(name = "awaiting_rel_cnt")
private integer awaitingrelcnt;
@jsonfield(name = "subscriptions_max")
private string subscriptionsmax;
@jsonfield(name = "recv_msg.qos0")
private integer recvmsgqos0;
@jsonfield(name = "recv_msg.qos1")
private integer recvmsgqos1;
@jsonfield(name = "recv_msg.qos2")
private integer recvmsgqos2;
@jsonfield(name = "node")
private string node;
@jsonfield(name = "inflight_max")
private integer inflightmax;
@jsonfield(name = "port")
private integer port;
@jsonfield(name = "recv_pkt")
private integer recvpkt;
@jsonfield(name = "send_oct")
private integer sendoct;
@jsonfield(name = "inflight_cnt")
private integer inflightcnt;
@jsonfield(name = "awaiting_rel_max")
private string awaitingrelmax;
@jsonfield(name = "is_persistent")
private boolean ispersistent;
@jsonfield(name = "send_msg.dropped")
private integer sendmsgdropped;
@jsonfield(name = "recv_msg.dropped")
private integer recvmsgdropped;
@jsonfield(name = "clean_start")
private boolean cleanstart;
@jsonfield(name = "send_msg.qos0")
private integer sendmsgqos0;
@jsonfield(name = "created_at")
private string createdat;
@jsonfield(name = "connected_at")
private string connectedat;
@jsonfield(name = "enable_authn")
private boolean enableauthn;
@jsonfield(name = "mqueue_dropped")
private integer mqueuedropped;
@jsonfield(name = "is_bridge")
private boolean isbridge;
@jsonfield(name = "send_msg.dropped.queue_full")
private integer sendmsgdroppedqueuefull;
@jsonfield(name = "proto_name")
private string protoname;
@jsonfield(name = "ip_address")
private string ipaddress;
@jsonfield(name = "send_cnt")
private integer sendcnt;
@jsonfield(name = "connected")
private boolean connected;
@jsonfield(name = "recv_oct")
private integer recvoct;
@jsonfield(name = "mqueue_max")
private integer mqueuemax;
@jsonfield(name = "send_msg.qos2")
private integer sendmsgqos2;
@jsonfield(name = "send_msg.qos1")
private integer sendmsgqos1;
@jsonfield(name = "expiry_interval")
private integer expiryinterval;
@jsonfield(name = "send_pkt")
private integer sendpkt;
@jsonfield(name = "username")
private string username;
}
test
clientip = emqxrmiservice.getclientbyid("clientid").getipaddress();
总结
- 路线清晰,只是操作起来稍微麻烦点
- 注意:emqx打开了代理的访问的监听器,只能通过nginx来中转,无法直接链接了
- 当前只用了一个emqx,没有做集群
- docker端口 暴露需要前期规划好,要不然特别麻烦
到此这篇关于利用java获取被nginx代理的emqx客户端真实ip的文章就介绍到这了,更多相关java获取客户端ip内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
发表评论