EMQ X
消息服务器集群基于Erlang/OTP
分布式设计,集群原理可简述为下述两条规则:
MQTT
客户端订阅主题时,所在节点订阅成功后广播通知其他节点:某个主题(Topic
)被本节点订阅。
MQTT
客户端发布消息时,所在节点会根据消息主题(Topic
),检索订阅并路由消息到相关节点。
1. 集群架构 #
EMQ X
消息服务器同一集群的所有节点,都会复制一份主题(Topic) -> 节点(Node)映射的路由表,例如:topic1 -> node-01, node-02 topic2 -> node-03 topic3 -> node-02, node-04
复制成功!
2. 集群部署规划 #
item |
node 1 |
node 2 |
node 3 |
---|---|---|---|
node.name |
node-01@127.0.0.1 | node-02@127.0.0.1 | node-03@127.0.0.1 |
mqtt:tcp:external |
3000 | 3100 | 3200 |
mqtt:ssl:external |
3010 | 3110 | 3210 |
mqtt:ws:external |
3020 | 3120 | 3220 |
mqtt:wss:external |
3030 | 3130 | 3230 |
http:management |
3040 | 3140 | 3240 |
http:dashboard |
3050 | 3150 | 3250 |
mqtt:tcp:internal |
13000 | 13100 | 13200 |
3. 安装文件 #
下载文件
请下载
ubuntu zip
版本
-
以
4.3.10
版本为例:https://www.emqx.com/zh/downloads/broker/4.3.10/emqx-ubuntu20.04-4.3.10-amd64.zip -
其他版本下载:https://www.emqx.com/zh/try?product=broker
解压文件
unzip emqx-ubuntu20.04-4.3.10-amd64.zip
复制成功!
4. 集群部署 #
4.1 创建文件目录 #
分别创建三个节点(
node
)目录,多个以此类推node-N
cd /data
mkdir -p emqx/dc3/node-01 emqx/dc3/node-02 emqx/dc3/node-03
复制成功!
将解压的
emqx
文件放入到每个节点中,其他节点操作一致
cp -r emqx* emqx/dc3/node-01/
# pwd
# /data/emqx/dc3/node-01
# ls
# bin data dynlibs erts-11.1.8 etc lib log releases
复制成功!
4.2 配置文件 #
在每个节点的
etc
下配置文件emqx.conf
、plugins/emqx_dashboard.conf
、plugins/emqx_management.conf
4.2.1 emqx.conf
#
## EMQ X Configuration 4.3
## NOTE: Do not change format of CONFIG_SECTION_{BGN,END} comments!
## CONFIG_SECTION_BGN=cluster ==================================================
## Cluster name.
##
## Value: String
cluster.name = dc3-emqx-cluster
## Specify the erlang distributed protocol.
##
## Value: Enum
## - inet_tcp: the default; handles TCP streams with IPv4 addressing.
## - inet6_tcp: handles TCP with IPv6 addressing.
## - inet_tls: using TLS for Erlang Distribution.
##
## vm.args: -proto_dist inet_tcp
cluster.proto_dist = inet_tcp
## Cluster auto-discovery strategy.
##
## Value: Enum
## - manual: Manual join command
## - static: Static node list
## - mcast: IP Multicast
## - dns: DNS A Record
## - etcd: etcd
## - k8s: Kubernetes
##
## Default: manual
cluster.discovery = manual
## Enable cluster autoheal from network partition.
##
## Value: on | off
##
## Default: on
cluster.autoheal = on
## Autoclean down node. A down node will be removed from the cluster
## if this value > 0.
##
## Value: Duration
## -h: hour, e.g. '2h' for 2 hours
## -m: minute, e.g. '5m' for 5 minutes
## -s: second, e.g. '30s' for 30 seconds
##
## Default: 5m
cluster.autoclean = 5m
##--------------------------------------------------------------------
## Cluster using static node list
## Node list of the cluster.
##
## Value: String
## cluster.static.seeds = emqx1@127.0.0.1,emqx2@127.0.0.1
##--------------------------------------------------------------------
## Cluster using IP Multicast.
## IP Multicast Address.
##
## Value: IP Address
## cluster.mcast.addr = 239.192.0.1
## Multicast Ports.
##
## Value: Port List
## cluster.mcast.ports = 4369,4370
## Multicast Iface.
##
## Value: Iface Address
##
## Default: 0.0.0.0
## cluster.mcast.iface = 0.0.0.0
## Multicast Ttl.
##
## Value: 0-255
## cluster.mcast.ttl = 255
## Multicast loop.
##
## Value: on | off
## cluster.mcast.loop = on
##--------------------------------------------------------------------
## Cluster using DNS A records.
## DNS name.
##
## Value: String
## cluster.dns.name = localhost
## The App name is used to build 'node.name' with IP address.
##
## Value: String
## cluster.dns.app = emqx
##--------------------------------------------------------------------
## Cluster using etcd
## Etcd server list, seperated by ','.
##
## Value: String
## cluster.etcd.server = http://127.0.0.1:2379
## The prefix helps build nodes path in etcd. Each node in the cluster
## will create a path in etcd: v2/keys/<prefix>/<cluster.name>/<node.name>
##
## Value: String
## cluster.etcd.prefix = dc3-emqx-cluster
## The TTL for node's path in etcd.
##
## Value: Duration
##
## Default: 1m, 1 minute
## cluster.etcd.node_ttl = 1m
## Path to a file containing the client's private PEM-encoded key.
##
## Value: File
## cluster.etcd.ssl.keyfile = etc/certs/client-key.pem
## The path to a file containing the client's certificate.
##
## Value: File
## cluster.etcd.ssl.certfile = etc/certs/client.pem
## Path to the file containing PEM-encoded CA certificates. The CA certificates
## are used during server authentication and when building the client certificate chain.
##
## Value: File
## cluster.etcd.ssl.cacertfile = etc/certs/ca.pem
##--------------------------------------------------------------------
## Cluster using Kubernetes
## Kubernetes API server list, seperated by ','.
##
## Value: String
## cluster.k8s.apiserver = http://10.110.111.204:8080
## The service name helps lookup EMQ nodes in the cluster.
##
## Value: String
## cluster.k8s.service_name = emqx
## The address type is used to extract host from k8s service.
##
## Value: ip | dns | hostname
## cluster.k8s.address_type = ip
## The app name helps build 'node.name'.
##
## Value: String
## cluster.k8s.app_name = emqx
## The suffix added to dns and hostname get from k8s service
##
## Value: String
## cluster.k8s.suffix = pod.cluster.local
## Kubernetes Namespace
##
## Value: String
## cluster.k8s.namespace = default
## CONFIG_SECTION_END=cluster ==================================================
##--------------------------------------------------------------------
## Node
##--------------------------------------------------------------------
## Node name.
##
## See: http://erlang.org/doc/reference_manual/distributed.html
##
## Value: <name>@<host>
##
## Default: node-01@127.0.0.1
node.name = node-01@127.0.0.1
## Cookie for distributed node communication.
##
## Value: String
node.cookie = emqxsecretcookie
## Data dir for the node
##
## Value: Folder
node.data_dir = data
## Heartbeat monitoring of an Erlang runtime system. Comment the line to disable
## heartbeat, or set the value as 'on'
##
## Turning this on may cause the node to restart if it becomes unresponsive to
## the heartbeat pings.
##
## NOTE: When managed by systemd (or other supervision tools like systemd),
## heart will probably only cause EMQ X to stop, but restart or not will
## depend on systemd's restart strategy.
## NOTE: When running in docker, the container will die as soon as the the
## heart process kills EMQ X, but restart or not will depend on container
## supervision strategy, such as k8s restartPolicy.
##
## Value: on
##
## vm.args: -heart
## node.heartbeat = on
## Sets the number of threads in async thread pool. Valid range is 0-1024.
##
## See: http://erlang.org/doc/man/erl.html
##
## Value: 0-1024
##
## vm.args: +A Number
## node.async_threads = 4
## Sets the maximum number of simultaneously existing processes for this
## system if a Number is passed as value.
##
## See: http://erlang.org/doc/man/erl.html
##
## Value: Number [1024-134217727]
##
## vm.args: +P Number
## node.process_limit = 2097152
## Sets the maximum number of simultaneously existing ports for this system.
##
## See: http://erlang.org/doc/man/erl.html
##
## Value: Number [1024-134217727]
##
## vm.args: +Q Number
## node.max_ports = 1048576
## Sets the distribution buffer busy limit (dist_buf_busy_limit).
##
## See: http://erlang.org/doc/man/erl.html
##
## Value: Number [1KB-2GB]
##
## vm.args: +zdbbl size
## node.dist_buffer_size = 8MB
## Sets the maximum number of ETS tables. Note that mnesia and SSL will
## create temporary ETS tables.
##
## Value: Number
##
## vm.args: +e Number
## node.max_ets_tables = 262144
## Global GC Interval.
##
## Value: Duration
##
## Examples:
## - 2h: 2 hours
## - 30m: 30 minutes
## - 20s: 20 seconds
##
## Defaut: 15 minutes
node.global_gc_interval = 15m
## Tweak GC to run more often.
##
## Value: Number [0-65535]
##
## vm.args: -env ERL_FULLSWEEP_AFTER Number
## node.fullsweep_after = 1000
## Crash dump log file.
##
## Value: Log file
node.crash_dump = log/crash.dump
## Specify SSL Options in the file if using SSL for Erlang Distribution.
##
## Value: File
##
## vm.args: -ssl_dist_optfile <File>
## node.ssl_dist_optfile = etc/ssl_dist.conf
## Sets the net_kernel tick time. TickTime is specified in seconds.
## Notice that all communicating nodes are to have the same TickTime
## value specified.
##
## See: http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
##
## Value: Number
##
## vm.args: -kernel net_ticktime Number
## node.dist_net_ticktime = 120
## Sets the port range for the listener socket of a distributed Erlang node.
## Note that if there are firewalls between clustered nodes, this port segment
## for nodes’ communication should be allowed.
##
## See: http://www.erlang.org/doc/man/kernel_app.html
##
## Value: Port [1024-65535]
node.dist_listen_min = 6369
node.dist_listen_max = 6369
node.backtrace_depth = 16
## CONFIG_SECTION_BGN=rpc ======================================================
## RPC Mode.
##
## Value: sync | async
rpc.mode = async
## Max batch size of async RPC requests.
##
## Value: Integer
## Zero or negative value disables rpc batching.
##
## NOTE: RPC batch won't work when rpc.mode = sync
rpc.async_batch_size = 256
## RPC port discovery
##
## The strategy for discovering the RPC listening port of other nodes.
##
## Value: Enum
## - manual: discover ports by `tcp_server_port` and `tcp_client_port`.
## - stateless: discover ports in a stateless manner.
## If node name is `emqx<N>@127.0.0.1`, where the `<N>` is an integer,
## then the listening port will be `5370 + <N>`
##
## Defaults to `stateless`.
rpc.port_discovery = stateless
## TCP port number for RPC server to listen on.
##
## Only takes effect when `rpc.port_discovery` = `manual`.
##
## NOTE: All nodes in the cluster should agree to this same config.
##
## Value: Port [1024-65535]
#rpc.tcp_server_port = 5369
## Number of outgoing RPC connections.
##
## Value: Interger [0-256]
## Default = 1
#rpc.tcp_client_num = 1
## RCP Client connect timeout.
##
## Value: Seconds
rpc.connect_timeout = 5s
## TCP send timeout of RPC client and server.
##
## Value: Seconds
rpc.send_timeout = 5s
## Authentication timeout
##
## Value: Seconds
rpc.authentication_timeout = 5s
## Default receive timeout for call() functions
##
## Value: Seconds
rpc.call_receive_timeout = 15s
## Socket idle keepalive.
##
## Value: Seconds
rpc.socket_keepalive_idle = 900s
## TCP Keepalive probes interval.
##
## Value: Seconds
rpc.socket_keepalive_interval = 75s
## Probes lost to close the connection
##
## Value: Integer
rpc.socket_keepalive_count = 9
## Size of TCP send buffer.
##
## Value: Bytes
rpc.socket_sndbuf = 1MB
## Size of TCP receive buffer.
##
## Value: Seconds
rpc.socket_recbuf = 1MB
## Size of user-level software socket buffer.
##
## Value: Seconds
rpc.socket_buffer = 1MB
## CONFIG_SECTION_END=rpc ======================================================
## CONFIG_SECTION_BGN=logger ===================================================
## Where to emit the logs.
## Enable the console (standard output) logs.
##
## Value: file | console | both
## - file: write logs only to file
## - console: write logs only to standard I/O
## - both: write logs both to file and standard I/O
log.to = file
## The log severity level.
##
## Value: debug | info | notice | warning | error | critical | alert | emergency
##
## Note: Only the messages with severity level higher than or equal to
## this level will be logged.
##
## Default: warning
log.level = warning
## The dir for log files.
##
## Value: Folder
log.dir = log
## The log filename for logs of level specified in "log.level".
##
## If `log.rotation` is enabled, this is the base name of the
## files. Each file in a rotated log is named <base_name>.N, where N is an integer.
##
## Value: String
## Default: emqx.log
log.file = emqx.log
## Limits the total number of characters printed for each log event.
##
## Value: Integer
## Default: No Limit
#log.chars_limit = 8192
## Maximum depth for Erlang term log formatting
## and Erlang process message queue inspection.
##
## Value: Integer or 'unlimited' (without quotes)
## Default: 100
#log.max_depth = 100
## Log formatter
## Value: text | json
#log.formatter = text
## Log to single line
## Value: Boolean
#log.single_line = true
## Enables the log rotation.
## With this enabled, new log files will be created when the current
## log file is full, max to `log.rotation.size` files will be created.
##
## Value: on | off
## Default: on
log.rotation = on
## Maximum size of each log file.
##
## Value: Number
## Default: 10M
## Supported Unit: KB | MB | GB
log.rotation.size = 10MB
## Maximum rotation count of log files.
##
## Value: Number
## Default: 5
log.rotation.count = 5
## To create additional log files for specific log levels.
##
## Value: File Name
## Format: log.$level.file = $filename,
## where "$level" can be one of: debug, info, notice, warning,
## error, critical, alert, emergency
## Note: Log files for a specific log level will only contain all the logs
## that higher than or equal to that level
##
#log.info.file = info.log
#log.error.file = error.log
## The max allowed queue length before switching to sync mode.
##
## Log overload protection parameter. If the message queue grows
## larger than this value the handler switches f