一、背景
最近业余时间想基于Envoy 开发一个串口网关,主要是想把一些 modbus、bacnet 以及 mqtt 等物联网协议接入Envoy中,当读到串口数据后可以转发成对应的网络协议
二、Envoy的优势
选择Envoy的话主要是因为Envoy的代码已经十分健全了,零信任、连接池、DNS解析、健康检查、集群调度等等Envoy都支持的很完善了,思来想去还是决定在Envoy基础上走二开,Envoy应该是C++里写的最好的网关了,内存小,而且扩展性极强。
三、Envoy在物联网方面的劣势
但是Envoy也有一些缺点
1、Envoy是基于互联网的网关对物联网模块支持不足、对物联网协议支持的不多
2、Envoy 代码巨大,开发难度和成本非常的高,开发起来非常的困难以及复杂,对技术要求十分的高。
3、Envoy代码巨大,变动一个文件就可能要几个小时
4、Envoy当前ListenerManager 并没有很好的扩展性,甚至在Bazel 文件里可见性只有几个模块,而且只支持UDP和TCP两种通信,要加一个串口通信难度并不小。
四、我们该怎么做?
尽量不要自己写基本的串口代码,使用第三方库libserialport
libserial
串口的第三方库
https://github.com/crayzeewulf/libserial
我们需要做的事情是两部:
1、加入新的listener
2、引入第三方串口库
1、加入新的listener
Envoy本身是不支持Listener模块扩展的,只支持Filter,Listener模块如果我们想扩展,就需要动ListenerManager的代码,动刀需要谨慎,所以我拷贝出来一份ListenerMangaer
核心代码改动:
修改配置下发格式为:
listeners:
- name: listener_0
address:
socket_address:
address: 0.0.0.0
port_value: 10001
rtu:
path: /dev/ttyS0
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
修改proto配置文件
api/envoy/config/core/v3/address.proto
加入串口的Rtu形式:
// Addresses specify either a logical or physical address and port, which are
// used to tell Envoy where to bind/listen, connect to upstream and find
// management servers.
message Address {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.core.Address";
oneof address {
option (validate.required) = true;
SocketAddress socket_address = 1;
Pipe pipe = 2;
// Specifies a user-space address handled by :ref:`internal listeners
// <envoy_v3_api_field_config.listener.v3.Listener.internal_listener>`.
EnvoyInternalAddress envoy_internal_address = 3;
Rtu rtu = 4;
}
}
message Rtu {
option (udpa.annotations.versioning).previous_message_type = "envoy.api.v2.core.Rtu";
// Unix Domain Socket path. On Linux, paths starting with '@' will use the
// abstract namespace. The starting '@' is replaced by a null byte by Envoy.
// Paths starting with '@' will result in an error in environments other than
// Linux.
string path = 1 [(validate.rules).string = {min_len: 1}];
// The mode for the Rtu. Not applicable for abstract sockets.
uint32 mode = 2 [(validate.rules).uint32 = {lte: 511}];
}
source/common/network/utility.cc
适配串口:
case envoy::config::core::v3::SocketAddress::UDP:
return Socket::Type::Datagram;
case envoy::config::core::v3::SocketAddress::SERIAL:
return Socket::Type::Stream;
}
}
由于watchermen listener manager 原来Envoy代码不能用了,需要我们扩展,所以注释掉原来Envoy的ListenerManager入口
//REGISTER_FACTORY(DefaultListenerManagerFactoryImpl, ListenerManagerFactory);
加入新的串口实例化方式
source/common/network/utility.cc
return std::make_shared<Address::EnvoyInternalInstance>(
proto_address.envoy_internal_address().server_listener_name(),
proto_address.envoy_internal_address().endpoint_id());
case envoy::config::core::v3::Address::AddressCase::kRtu:
return std::make_shared<Address::RtuInstance>(proto_address.rtu().path(),
proto_address.rtu().mode());
case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
PANIC_DUE_TO_PROTO_UNSET;
}
case envoy::config::core::v3::Address::AddressCase::kEnvoyInternalAddress:
// Currently internal address supports stream operation only.
return Socket::Type::Stream;
case envoy::config::core::v3::Address::AddressCase::kRtu:
return Socket::Type::Stream;
case envoy::config::core::v3::Address::AddressCase::ADDRESS_NOT_SET:
PANIC_DUE_TO_PROTO_UNSET;
}
/**
* Implementation of a pipe address (unix domain socket on unix).
*/
class RtuInstance : public InstanceBase {
public:
/**
* Construct from an existing unix address.
*/
explicit RtuInstance(const sockaddr_un* address, socklen_t ss_len, mode_t mode = 0,
const SocketInterface* sock_interface = nullptr);
/**
* Construct from a string pipe path.
*/
explicit RtuInstance(const std::string& pipe_path, mode_t mode = 0,
const SocketInterface* sock_interface = nullptr);
static absl::Status validateProtocolSupported() { return absl::OkStatus(); }
// Network::Address::Instance
bool operator==(const Instance& rhs) const override;
const Ip* ip() const override { return nullptr; }
const Pipe* pipe() const override { return &pipe_; }
const EnvoyInternalAddress* envoyInternalAddress() const override { return nullptr; }
const sockaddr* sockAddr() const override {
return reinterpret_cast<const sockaddr*>(&pipe_.address_);
}
const sockaddr_un& getSockAddr() const { return pipe_.address_; }
socklen_t sockAddrLen() const override {
if (pipe_.abstract_namespace_) {
return offsetof(struct sockaddr_un, sun_path) + pipe_.address_length_;
}
return sizeof(pipe_.address_);
}
absl::string_view addressType() const override { return "default"; }
private:
/**
* Construct from an existing unix address.
* Store the error status code in passed in parameter instead of throwing.
* It is called by the factory method and the partially constructed instance will be discarded
* upon error.
*/
RtuInstance(absl::Status& error, const sockaddr_un* address, socklen_t ss_len, mode_t mode = 0,
const SocketInterface* sock_interface = nullptr);
struct PipeHelper : public Pipe {
bool abstractNamespace() const override { return abstract_namespace_; }
mode_t mode() const override { return mode_; }
sockaddr_un address_;
// For abstract namespaces.
bool abstract_namespace_{false};
uint32_t address_length_{0};
mode_t mode_{0};
};
absl::Status initHelper(const sockaddr_un* address, mode_t mode);
PipeHelper pipe_;
friend class InstanceFactory;
};
//RTU
RtuInstance::RtuInstance(const sockaddr_un* address, socklen_t ss_len, mode_t mode,
const SocketInterface* sock_interface)
: InstanceBase(Type::Pipe, sockInterfaceOrDefault(sock_interface)) {
if (address->sun_path[0] == '\0') {
#if !defined(__linux__)
throw EnvoyException("Abstract AF_UNIX sockets are only supported on linux.");
#endif
RELEASE_ASSERT(static_cast<unsigned int>(ss_len) >= offsetof(struct sockaddr_un, sun_path) + 1,
"");
pipe_.abstract_namespace_ = true;
pipe_.address_length_ = ss_len - offsetof(struct sockaddr_un, sun_path);
}
absl::Status status = initHelper(address, mode);
throwOnError(status);
}
RtuInstance::RtuInstance(const std::string& pipe_path, mode_t mode,
const SocketInterface* sock_interface)
: InstanceBase(Type::Pipe, sockInterfaceOrDefault(sock_interface)) {
if (pipe_path.size() >= sizeof(pipe_.address_.sun_path)) {
throw EnvoyException(
fmt::format("Path \"{}\" exceeds maximum UNIX domain socket path size of {}.", pipe_path,
sizeof(pipe_.address_.sun_path)));
}
memset(&pipe_.address_, 0, sizeof(pipe_.address_));
pipe_.address_.sun_family = AF_UNIX;
if (pipe_path[0] == '@') {
// This indicates an abstract namespace.
// In this case, null bytes in the name have no special significance, and so we copy all
// characters of pipe_path to sun_path, including null bytes in the name. The pathname must also
// be null terminated. The friendly name is the address path with embedded nulls replaced with
// '@' for consistency with the first character.
#if !defined(__linux__)
throw EnvoyException("Abstract AF_UNIX sockets are only supported on linux.");
#endif
if (mode != 0) {
throw EnvoyException("Cannot set mode for Abstract AF_UNIX sockets");
}
pipe_.abstract_namespace_ = true;
pipe_.address_length_ = pipe_path.size();
// The following statement is safe since pipe_path size was checked at the beginning of this
// function
memcpy(&pipe_.address_.sun_path[0], pipe_path.data(), pipe_path.size()); // NOLINT(safe-memcpy)
pipe_.address_.sun_path[0] = '\0';
pipe_.address_.sun_path[pipe_path.size()] = '\0';
friendly_name_ = friendlyNameFromAbstractPath(
absl::string_view(pipe_.address_.sun_path, pipe_.address_length_));
} else {
// Throw an error if the pipe path has an embedded null character.
if (pipe_path.size() != strlen(pipe_path.c_str())) {
throw EnvoyException("UNIX domain socket pathname contains embedded null characters");
}
StringUtil::strlcpy(&pipe_.address_.sun_path[0], pipe_path.c_str(),
sizeof(pipe_.address_.sun_path));
friendly_name_ = pipe_.address_.sun_path;
}
pipe_.mode_ = mode;
}
RtuInstance::RtuInstance(absl::Status& error, const sockaddr_un* address, socklen_t ss_len,
mode_t mode, const SocketInterface* sock_interface)
: InstanceBase(Type::Pipe, sockInterfaceOrDefault(sock_interface)) {
if (address->sun_path[0] == '\0') {
#if !defined(__linux__)
error = absl::FailedPreconditionError("Abstract AF_UNIX sockets are only supported on linux.");
return;
#endif
RELEASE_ASSERT(static_cast<unsigned int>(ss_len) >= offsetof(struct sockaddr_un, sun_path) + 1,
"");
pipe_.abstract_namespace_ = true;
pipe_.address_length_ = ss_len - offsetof(struct sockaddr_un, sun_path);
}
error = initHelper(address, mode);
}
bool RtuInstance::operator==(const Instance& rhs) const { return asString() == rhs.asString(); }
absl::Status RtuInstance::initHelper(const sockaddr_un* address, mode_t mode) {
pipe_.address_ = *address;
if (pipe_.abstract_namespace_) {
if (mode != 0) {
return absl::FailedPreconditionError("Cannot set mode for Abstract AF_UNIX sockets");
}
// Replace all null characters with '@' in friendly_name_.
friendly_name_ = friendlyNameFromAbstractPath(
absl::string_view(pipe_.address_.sun_path, pipe_.address_length_));
} else {
friendly_name_ = address->sun_path;
}
pipe_.mode_ = mode;
return absl::OkStatus();
}
加入新的常量:
enum class Type { Ip, Pipe, EnvoyInternal, Rtu };
二、引入第三方串口库
第三方串口库我使用的是
GitHub - crayzeewulf/libserial: Serial Port Programming in C++
定义iot.bzl
load("@envoy_api//bazel:envoy_http_archive.bzl", "envoy_http_archive")
load("@envoy_api//bazel:external_deps.bzl", "load_repository_locations")
load("repository_locations.bzl", "WATCHERMEN_REPOSITORY_LOCATIONS_SPEC")
# archives, e.g. cares.
def _build_all_content(exclude = []):
return """filegroup(name = "all", srcs = glob(["**"], exclude={}), visibility = ["//visibility:public"])""".format(repr(exclude))
BUILD_ALL_CONTENT = _build_all_content()
WATCHERMEN_REPOSITORY_LOCATIONS = load_repository_locations(WATCHERMEN_REPOSITORY_LOCATIONS_SPEC)
# Use this macro to reference any HTTP archive from bazel/repository_locations.bzl.
def external_http_archive(name, **kwargs):
envoy_http_archive(
name,
locations = WATCHERMEN_REPOSITORY_LOCATIONS,
**kwargs
)
def watchermen_iot_dependencies():
external_http_archive(
name = "com_github_serial",
build_file_content = BUILD_ALL_CONTENT,
)
native.bind(
name = "serial",
actual = "//bazel/foreign_cc:serial",
)
定义仓库字典:
# This should match the schema defined in external_deps.bzl.
WATCHERMEN_REPOSITORY_LOCATIONS_SPEC = dict(
com_github_serial = dict(
project_name = "serial",
project_desc = "C library for serial port",
project_url = "https://github.com/crayzeewulf/libserial",
version = "master",
strip_prefix = "libserial-{version}",
# urls = ["https://github.com/crayzeewulf/libserial/archive/refs/tags/v{version}.tar.gz"],
urls = ["https://github.com/crayzeewulf/libserial/archive/refs/heads/master.zip"],
release_date = "2022-05-29",
sha256 = "9f0c6137e56027d496a205072c527d47f552d4c170f24ae5cea2668da54e2a1b",
use_category = ["dataplane_core"],
cpe = "cpe:2.3:a:c-serial_project:c-serial:*",
license = "libserial",
license_url = "https://github.com/crayzeewulf/libserial/blob/master/LICENSE.txt",
),
)
加入bazel cmake
envoy_cmake(
name = "serial",
lib_source = "@com_github_serial//:all",
cache_entries = {
# "CMAKE_INSTALL_LIBDIR": "lib",
# "CMAKE_CXX_COMPILER_FORCED": "on",
"LIBSERIAL_ENABLE_TESTING": "off",
"LIBSERIAL_BUILD_EXAMPLES": "off",
},
# linkopts = select({
# # "//bazel:apple": ["-lresolv"],
# "//conditions:default": [],
# }),
cmake_files_dir = "$BUILD_TMPDIR/",
out_static_libs = select({
"//conditions:default": ["libserial.a"],
}),
# postfix_script = select({
# # "//bazel:windows_x86_64": "cp -L $EXT_BUILD_ROOT/external/com_github_libserial/src/lib/ares_nameser.h $INSTALLDIR/include/ares_nameser.h && cp -L $EXT_BUILD_ROOT/external/com_github_c_ares_c_ares/include/ares_dns.h $INSTALLDIR/include/ares_dns.h",
# # "//conditions:default": "rm -f $INSTALLDIR/include/ares_dns.h && cp -L $EXT_BUILD_ROOT/external/com_github_c_ares_c_ares/include/ares_dns.h $INSTALLDIR/include/ares_dns.h",
# }),
)
在模块中引入Envoy
envoy_cc_library(
name = "watchermen_iot_factory_lib",
hdrs = [
"watchermen_rtu_socket_handle.h",
"watchermen_rtu_listener_socket.h",
],
srcs = [
"watchermen_rtu_listener_socket.cc",
"watchermen_rtu_socket_handle.cc",
],
repository = "@envoy",
external_deps = ["serial"],
deps = [
# "//external:serial"
],
)