OpenIPC开源FPV之Adaptive-Link地面站代码解析
- 1. 源由
- 2. 框架代码
- 3. 软件配置
- 3.1 默认配置
- 3.2 加载配置
- 3.3 更新配置
- 4. 通信例程
- 4.1 TCP报文解析
- 4.2 UDP报文发送
- 5. 特殊指令
- 5.1 request_keyframe
- 5.2 drop_gop
- 5.3 resume_adaptive
- 5.4 pause_adaptive
- 6. 总结
- 7. 参考资料
1. 源由
在《OpenIPC开源FPV之Adaptive-Link工程解析》中,已经有了整个工程的大体概念,接下来再对代码进行逐步分析。
在之前对天空端代码进行了研读,接下来对地面端的代码进行分析:alink_gs.py
2. 框架代码
- 配置:“/etc/adaptive_link.conf”
- wfb-cli:127.0.0.1:8003
- UDP: 10.5.0.10:9999
__main__
├──> load_config(args.config) // --config "/etc/adaptive_link.conf"
└──> connect_and_receive_msgpack()
注:connect_and_receive_msgpack
建立TCP和UDP通信管道。
3. 软件配置
3.1 默认配置
VERSION = "0.1.1"
# Global variables
results = []
link_health_score_rssi = 1000 # Default score before calculation for RSSI
link_health_score_snr = 1000 # Default score before calculation for SNR
recovered_packets = 0 # To store the number of recovered packets (FEC)
lost_packets = 0 # To store the number of lost packets
best_antennas_rssi = [-105, -105, -105, -105] # Default values for best antennas RSSI if less than 4
best_antennas_snr = [-105, -105, -105, -105] # Default values for best antennas SNR if less than 4
config = None # Configuration file object
udp_socket = None # To store the UDP socket globally
udp_ip = None # To store the UDP IP globally
udp_port = None # To store the UDP port globally
previous_link_health_score_rssi = None
previous_link_health_score_snr = None
# Default config values
DEFAULT_CONFIG = {
'Settings': {
'version': VERSION, # Current version of the script
'message_interval': '100', # Time between sending messages (milliseconds)
'use_best_rssi': 'True', # Option to pick best available RSSI for link health score
'min_rssi': '-85', # Min RSSI range for link health
'max_rssi': '-50', # Max RSSI range for link health
'min_snr': '12', # Min SNR range for link health
'max_snr': '28', # Max SNR range for link health
'host': '127.0.0.1', # Host address to connect to
'port': '8003', # Port to connect to
'udp_ip': '10.5.0.10', # UDP IP to send link health data
'udp_port': '9999', # UDP port to send link health data
'retry_interval': '1', # Time in seconds to wait before retrying TCP connection on failure
},
'Descriptions': {
'version': 'Version of the script',
'message_interval': 'Time between sending UDP messages in milliseconds',
'use_best_rssi': 'Use True to pick the best available RSSI for health score, or False for average RSSI',
'min_rssi': 'Minimum RSSI value used for link health calculation',
'max_rssi': 'Maximum RSSI value used for link health calculation',
'min_snr': 'Minimum SNR value used for link health calculation',
'max_snr': 'Maximum SNR value used for link health calculation',
'host': 'Host address to connect to for the TCP connection',
'port': 'Port to connect to for the TCP connection',
'udp_ip': 'UDP IP to send the link health data',
'udp_port': 'UDP port to send the link health data',
'retry_interval': 'Time in seconds to wait before retrying TCP connection on failure'
}
}
3.2 加载配置
配置通过"/etc/adaptive_link.conf"文件传入,若没有该文件,系统默认生成一份。
load_config(config_file='config.ini')
├──> Check if config file exists
│ ├──> File does not exist
│ │ ├──> Print "Config file not found, creating..."
│ │ ├──> Load default values into config (DEFAULT_CONFIG)
│ │ └──> Write default config to file
│ └──> File exists
│ └──> Read existing config file
├──> Check version mismatch
│ ├──> Config version != VERSION
│ │ ├──> Print "Updating version in config file..."
│ │ ├──> Update config['Settings']['version'] to VERSION
│ │ └──> Call update_version_history(config_file)
│ └──> Config version == VERSION (do nothing)
├──> Ensure all default fields are present
│ ├──> Iterate through DEFAULT_CONFIG sections and keys
│ │ ├──> Section missing → Add section to config
│ │ ├──> Key missing in section → Add key to config
│ │ └──> Mark config as updated
├──> Save updated config (if needed)
│ └──> Write updated config to file
└──> Return (config is now fully loaded and updated)
3.3 更新配置
若无该配置文件,通过该命令直接生成一个默认配置文件。
注:配置文件升级更新,暂时无该功能。
def update_version_history(config_file):
"""
Updates the version history in the configuration file.
"""
if 'Version History' not in config:
config['Version History'] = {}
# Add the current version with a timestamp
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime())
config['Version History'][VERSION] = f"Version {VERSION} updated on {timestamp}"
# Write the updated config to file
with open(config_file, 'w') as f:
config.write(f)
4. 通信例程
- UDP: 10.5.0.10:9999
- TCP: 127.0.0.1:8003
connect_and_receive_msgpack()
├──> Initialize UDP socket and thread // 10.5.0.10:9999
│ ├──> Create global UDP socket (udp_socket)
│ └──> Start generate_package() in a separate thread
├──> Infinite loop to handle TCP connection and data // 127.0.0.1:8003
│ ├──> Try to connect to the server
│ │ ├──> Create TCP socket (client_socket)
│ │ ├──> Connect to (host, port)
│ │ └──> Read data from server in a loop
│ │ ├──> Read 4-byte length prefix
│ │ ├──> Extract message length (msg_length)
│ │ ├──> Read actual message data of length msg_length
│ │ │ ├──> Append chunks of data until full message is received
│ │ │ └──> Handle incomplete data or connection issues
│ │ └──> Unpack and process MessagePack data
│ │ ├──> Append unpacked data to results array
│ │ ├──> Handle "video rx" message
│ │ │ └──> Update link health metrics (RSSI, SNR, etc.)
│ │ └──> Handle unpack errors
│ └──> Handle connection issues
│ └──> Print error and retry connection after retry_interval seconds
└──> Return (loop runs indefinitely)
4.1 TCP报文解析
通过wfb-cli的TCP 8003端口获取本地RF信息数据。
calculate_link_health(video_rx)
├──> Global Variables Declaration
├──> Configuration Settings (Extract from `config`)
│ ├──> message_interval
│ ├──> use_best_rssi
│ ├──> min_rssi, max_rssi
│ └──> min_snr, max_snr
├──> Packets Data Retrieval (from `video_rx`)
│ ├──> fec_rec → recovered_packets
│ ├──> lost → lost_packets
│ └──> Special Messages Logic
│ ├──> request_keyframe()
│ └──> (Optional) drop_gop()
├──> Antenna Statistics Processing
│ ├──> Extract RSSI/SNR per antenna
│ └──> Fallback Logic (Default Values)
├──> Best Antennas Selection
│ ├──> Sort `rssi_values` and `snr_values`
│ └──> Pad with `-105` if fewer than 4 antennas
├──> Link Health Score Calculation
│ ├──> RSSI Health Score
│ │ ├──> rssi_to_use (Best vs Average)
│ │ └──> Interpolation Logic
│ ├──> SNR Health Score
│ │ ├──> avg_best_snr (Best vs Average)
│ │ └──> Interpolation Logic
│ └──> (Optional) Hysteresis Logic
├──> Rounding and Logging
└──> Return Scores (RSSI, SNR)
4.2 UDP报文发送
- 定期将本地最新全局变量组包
def generate_package():
"""
Generate, at interval, string with all the variables
"""
message_interval = int(config['Settings']['message_interval']) / 1000 # Convert to seconds
while True:
timestamp = int(time.time()) # Get epoch time in seconds since 1970
# Include best antennas in the message
message = f"{timestamp}:{link_health_score_rssi}:{link_health_score_snr}:{recovered_packets}:{lost_packets}:{best_antennas_rssi[0]}:{best_antennas_rssi[1]}:{best_antennas_rssi[2]}:{best_antennas_rssi[3]}"
# Pass the message to send_udp function
send_udp(message)
time.sleep(message_interval) # Send at the specified interval
- 通过UDP tunnel管道发送到天空端
def send_udp(message):
"""
Adds message length to the start and sends message on provided port
"""
if verbose_mode:
print("send_udp function has started") # Debug statement to confirm function start
# Prepend the size of the message as a 4-byte unsigned integer
message_bytes = message.encode('utf-8')
message_size = struct.pack('!I', len(message_bytes)) # Use network byte order (big-endian)
# Full message with size prefix
full_message = message_size + message_bytes
if verbose_mode:
print("Preparing UDP message to be sent") # Debug statement
# Send the message
try:
udp_socket.sendto(full_message, (udp_ip, udp_port))
if verbose_mode:
print(f"UDP Message Sent: {message} (size: {len(message_bytes)} bytes)")
except Exception as e:
if verbose_mode:
print(f"Error sending UDP data: {e}")
5. 特殊指令
5.1 request_keyframe
def request_keyframe():
"""
Send a special message to request a keyframe
"""
special_message = "special:request_keyframe"
num_attempts = 0 # Number of times to send the message
for attempt in range(num_attempts):
send_udp(special_message) # Call send_udp to send the special message
if verbose_mode:
print(f"Sent special message: {special_message}, attempt {attempt + 1}/{num_attempts}")
time.sleep(0.1) # Wait before the next attempt
5.2 drop_gop
def drop_gop():
"""
Send a special message to drop the gop
"""
special_message = "special:drop_gop"
num_attempts = 0 # Number of times to send the message
for attempt in range(num_attempts):
send_udp(special_message) # Call send_udp to send the special message
if verbose_mode:
print(f"Sent special message: {special_message}, attempt {attempt + 1}/{num_attempts}")
time.sleep(0.1) # Wait before the next attempt
5.3 resume_adaptive
TBD.
5.4 pause_adaptive
TBD.
6. 总结
地面端程序主要采用 python 进行编写,所以可以跨平台兼容。对于软件的部署来说非常便捷。
从代码实现角度,存在较多异常场景以及开放性问题值得讨论和完善。
- How to config/monitor 8812AU(gs) 8812EU(airunit) working status? #5
- The relationship between .c and binary files #7
- what’s the difference between hold_fallback_mode_s and hold_modes_down_s? #9
- WIP: Add support for RTL8812EU-based Wi-Fi adapters for FPV firmware #1344
- How to interpret each row of configuration data in txprofiles.conf? #10
7. 参考资料
【1】OpenIPC开源FPV之Adaptive-Link工程解析
【2】OpenIPC开源FPV之Adaptive-Link天空端代码解析