前言
可以先参考前面一篇文章 retrieveFileStream 之后需要调用 completePendingCommand 否则业务代码会存在问题 retrieveFileStream 之后需要调用 completePendingCommand 否则业务代码会存在问题
这里的问题 主要是来自于 某 qq 交流群的网友
呵呵 当然 这里测试用例代码的问题也是 非常多, 我们这里 只关注于 造成业务异常的相关部分
测试用例
/**
* @author Jerry.X.He <970655147@qq.com>
* @version 1.0
* @date 2022-09-16 15:22
*/
public class Test20MultiThreadDownloadFile implements Runnable {
public static void main(String[] args) {
for (int i = 0; i < 6; i++) {
new Thread(new Test20MultiThreadDownloadFile()).start();
}
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
public void run() {
while (true) {
try {
syncFile();
} catch (Exception e) {
System.err.println(ExceptionUtils.getFullStackTrace(e));
} finally {
try {
Thread.sleep(1000 * 60 * 1);
} catch (InterruptedException e) {
System.err.println(ExceptionUtils.getFullStackTrace(e));
}
}
}
}
public void syncFile() {
FTPClient ftpClient = new FTPClient();
try {
ftpClient.connect("localhost", 21);
if (FTPReply.isPositiveCompletion(ftpClient.getReplyCode())) {
if (ftpClient.login("root", "root")) {
System.out.println("FTP连接成功");
}
}
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
ftpClient.enterLocalPassiveMode();
ftpClient.setFileTransferMode(FTP.BINARY_FILE_TYPE);
ftpClient.changeWorkingDirectory("/");
FTPFile[] files = ftpClient.listFiles();
for (FTPFile ftpFile : files) {
boolean isPending = false;
try {
//if(StringUtils.equals(DateUtil.formatDate(new Date(), "yyyyMMdd"), ftpFile.getName().substring(28,36))){
System.out.println(Thread.currentThread().getName() + "|" + ftpFile.getName() + "开始同步");
InputStream input = ftpClient.retrieveFileStream(ftpFile.getName());
// isPending = (input != null);
//File localFile = new File(ComUtils.getAttachFilePathPrefix()+SysConts.GROUPBILL_ATTA_PATH+ftpFile.getName());
File localFile = new File("/Users/jerry/Tmp/Test20MultiThreadDownloadFile/" + ftpFile.getName());
localFile.getParentFile().mkdirs();
if (!localFile.exists()) {
localFile.createNewFile();
}
if (localFile.length() > 10) {
System.out.println(Thread.currentThread().getName() + "|文件已同步,跳过" + ftpFile.getName());
continue;
}
if (!localFile.canWrite()) {
System.out.println(Thread.currentThread().getName() + "|文件不能操作,跳过" + ftpFile.getName());
continue;
}
FileOutputStream out = new FileOutputStream(localFile);
byte[] buffer = new byte[1024];
int read;
while ((read = input.read(buffer, 0, buffer.length)) != -1) {
out.write(buffer, 0, read);
}
out.flush();
out.close();
input.close();
isPending = true;
System.out.println(Thread.currentThread().getName() + "|文件同步完成," + ftpFile.getName());
//}
} catch (Exception e) {
System.err.println(ExceptionUtils.getFullStackTrace(e));
} finally {
if (isPending) {
ftpClient.completePendingCommand();
System.out.println(Thread.currentThread().getName() + "发起结束命令");
}
}
}
} catch (Exception e) {
System.err.println(ExceptionUtils.getFullStackTrace(e));
} finally {
try {
if (ftpClient.isConnected()) {
ftpClient.disconnect();
System.out.println("FTP退出成功");
}
} catch (IOException e) {
System.err.println(ExceptionUtils.getFullStackTrace(e));
}
}
}
}
相关问题剖析
这里会出现 几种导致问题的情况, 主要的问题是 retrieveFileStream 之后未消费 retrieveFileStream 的第二次响应
1. ”if (localFile.length() > 10) {“ 中判断了目标文件是否已经存在, 如果存在 则跳过, isPending 为 false, 这里是可能 未消费 retrieveFileStream 的第二次响应的
2. ”if (!localFile.canWrite()) {“ 中判断了目标文件是否可以操作, 如果不可写 则跳过, isPending 为 false, 这里是可能 未消费 retrieveFileStream 的第二次响应的
3. “while ((read = input.read(buffer, 0, buffer.length)) != -1) {” 中 input 时可能为 null, 的因为 pasv 请求可能重试了 10 次之后绑定数据端口不成功, 然后 input 就是 null, 之后走的 catch, finally, isPending 为 false, 这里是可能 未消费 pasv 的响应异常的
如下图是 场景三 的响应
问题的处理方式
将 isPending 调整为如下即可
对于 "500 OOPS: vsf_sysutil_bind" 因为重试次数是代码里面固定死的
因此 要么调整被动模式的端口数量, 要么调整 客户端线程数量, 降低冲突的概率
被动模式 vsftpd 开放数据端口
重试 10 次, 随机碰撞端口
完