这篇文章主要介绍了Java多线程怎么实现FTP批量上传文件的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java多线程怎么实现FTP批量上传文件文章都会有所收获,下面我们一起来看看吧。
1、构建FTP客户端
package cn.com.pingtech.common.ftp;import lombok.extern.slf4j.Slf4j;import org.apache.commons.net.ftp.FTPClient;import org.apache.commons.net.ftp.FTPReply;import java.io.*;import java.net.UnknownHostException;@Slf4jpublic class FtpConnection { private FTPClient ftp = new FTPClient(); private boolean is_connected = false; public FtpConnection() { is_connected = false; ftp.setDefaultTimeout(FtpConfig.defaultTimeoutSecond * 1000); ftp.setConnectTimeout(FtpConfig.connectTimeoutSecond * 1000); ftp.setDataTimeout(FtpConfig.dataTimeoutSecond * 1000); try { initConnect(FtpConfig.host, FtpConfig.port, FtpConfig.user, FtpConfig.password); } catch (IOException e) { e.printStackTrace(); } } private void initConnect(String host, int port, String user, String password) throws IOException { try { ftp.connect(host, port); } catch (UnknownHostException ex) { throw new IOException("Can't find FTP server '" + host + "'"); } int reply = ftp.getReplyCode();//220 连接成功 if (!FTPReply.isPositiveCompletion(reply)) { disconnect(); throw new IOException("Can't connect to server '" + host + "'"); } if (!ftp.login(user, password)) { is_connected = false; disconnect(); throw new IOException("Can't login to server '" + host + "'"); } else { is_connected = true; } } public boolean upload(String path, String ftpFileName, File localFile) throws IOException { boolean is = false; //检查本地文件是否存在 if (!localFile.exists()) { throw new IOException("Can't upload '" + localFile.getAbsolutePath() + "'. This file doesn't exist."); } //设置工作路径 setWorkingDirectory(path); //上传 InputStream in = null; try { //被动模式 ftp.enterLocalPassiveMode(); in = new BufferedInputStream(new FileInputStream(localFile)); //保存文件 is = ftp.storeFile(ftpFileName, in); }catch (Exception e){ e.printStackTrace(); } finally { try { in.close(); } catch (IOException ex) { ex.printStackTrace(); } } return is; } public void disconnect() throws IOException { if (ftp.isConnected()) { try { ftp.logout(); ftp.disconnect(); is_connected = false; } catch (IOException ex) { ex.printStackTrace(); } } } private boolean setWorkingDirectory(String dir) { if (!is_connected) { return false; } //如果目录不存在创建目录 try { if (createDirecroty(dir)) { return ftp.changeWorkingDirectory(dir); } } catch (IOException e) { e.printStackTrace(); } return false; } public boolean isConnected() { return is_connected; } private boolean createDirecroty(String remote) throws IOException { boolean success = true; String directory = remote.substring(0, remote.lastIndexOf("/") + 1); // 如果远程目录不存在,则递归创建远程服务器目录 if (!directory.equalsIgnoreCase("/") && !ftp.changeWorkingDirectory(new String(directory))) { int start = 0; int end = 0; if (directory.startsWith("/")) { start = 1; } else { start = 0; } end = directory.indexOf("/", start); while (true) { String subDirectory = new String(remote.substring(start, end)); if (!ftp.changeWorkingDirectory(subDirectory)) { if (ftp.makeDirectory(subDirectory)) { ftp.changeWorkingDirectory(subDirectory); } else { log.error("mack directory error :/" + subDirectory); return false; } } start = end + 1; end = directory.indexOf("/", start); // 检查所有目录是否创建完毕 if (end <= start) { break; } } } return success; }}
2、FTP连接工厂
package cn.com.pingtech.common.ftp;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.util.concurrent.ArrayBlockingQueue;@Slf4jpublic class FtpFactory { //有界队列 private static final ArrayBlockingQueue<FtpConnection> arrayBlockingQueue = new ArrayBlockingQueue<>(FtpConfig.ftpConnectionSize); protected FtpFactory(){ log.info("init ftpConnectionSize "+FtpConfig.ftpConnectionSize); for(int i = 0; i< FtpConfig.ftpConnectionSize; i++){ //表示如果可能的话,将 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false arrayBlockingQueue.offer(new FtpConnection()); } } public FtpConnection getFtp() { FtpConnection poll = null; try { //取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止 poll = arrayBlockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return poll; } public boolean relase(FtpConnection ftp){ return arrayBlockingQueue.offer(ftp); } public void remove(FtpConnection ftp) { arrayBlockingQueue.remove(ftp); } public void close() { for (FtpConnection connection : arrayBlockingQueue) { try { connection.disconnect(); } catch (IOException e) { e.printStackTrace(); } } }}
3、FTP配置
package cn.com.pingtech.common.ftp;public class FtpConfig { public static int defaultTimeoutSecond = 10; public static int connectTimeoutSecond = 10; public static int dataTimeoutSecond = 10; public static String host = "127.0.0.1"; public static int port =9999; public static String user = "Administrator"; public static String password ="Yp886611"; public static int threadPoolSize = 1; public static int ftpConnectionSize = 1; }
4、构建多线程FTP上传任务
package cn.com.pingtech.common.ftp;import java.io.File;import java.io.IOException;import java.util.concurrent.Callable;public class UploadTask implements Callable{ private File file; private FtpConnection ftp; private String path; private String fileName; private FtpFactory factory; public UploadTask(FtpFactory factory,FtpConnection ftp, File file, String path, String fileName){ this.factory = factory; this.ftp = ftp; this.file = file; this.path = path; this.fileName = fileName; } @Override public UploadResult call() throws Exception { UploadResult result = null; try { if (ftp == null) { result = new UploadResult(file.getAbsolutePath(), false); return result; } //如果连接未开启 重新获取连接 if (!ftp.isConnected()) { factory.remove(ftp); ftp = new FtpConnection(); } //开始上传 result = new UploadResult(file.getName(), ftp.upload(path, fileName, file)); } catch (IOException ex) { result = new UploadResult(file.getName(), false); ex.printStackTrace(); } finally { factory.relase(ftp);//释放连接 } return result; }}
package cn.com.pingtech.common.ftp;public class UploadResult { private String fileName; //文件名称 private boolean result; //是否上传成功 public UploadResult(String fileName, boolean result) { this.fileName = fileName; this.result = result; } public String getFileName() { return fileName; } public void setFileName(String fileName) { this.fileName = fileName; } public boolean isResult() { return result; } public void setResult(boolean result) { this.result = result; } public String toString() { return "[fileName=" + fileName + " , result=" + result + "]"; }}
注意:实现Callable接口的任务线程能返回执行结果
Callable接口支持返回执行结果,此时需要调用FutureTask.get()方法实现,此方法会阻塞线程直到获取“将来”的结果,当不调用此方法时,主线程不会阻塞
5、FTP上传工具类
package cn.com.pingtech.common.ftp;import java.io.File;import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;public class FtpUtil { public static synchronized List upload(String ftpPath, File[] listFiles) { //构建线程池 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(FtpConfig.threadPoolSize); List<Future> results = new ArrayList<>(); //创建n个ftp链接 FtpFactory factory = new FtpFactory(); for (File file : listFiles) { FtpConnection ftp = factory.getFtp();//获取ftp con UploadTask upload = new UploadTask(factory,ftp, file, ftpPath, file.getName()); Future submit = newFixedThreadPool.submit(upload); results.add(submit); } List listResults = new ArrayList<>(); for (Future result : results) { try { //获取线程结果 UploadResult uploadResult = (UploadResult)result.get(30, TimeUnit.MINUTES); listResults.add(uploadResult); } catch (Exception e) { e.printStackTrace(); } } factory.close(); newFixedThreadPool.shutdown(); return listResults; }}
6、测试上传
package cn.com.pingtech.common.ftpclass Client { public static void main(String[] args) throws IOException { String loalPath = "C:\\Users\\Administrator\\Desktop\\test\\0"; String ftpPath = "/data/jcz/"; File parentFile = new File(loalPath); List <UploadResult> list = FtpUtil.upload(ftpPath,parentFile.listFiles()); for(UploadResult vo:list){ System.out.println(vo); } }}
注意:FTP协议里面,规定文件名编码为iso-8859-1,所以目录名或文件名需要转码
关于“Java多线程怎么实现FTP批量上传文件”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“Java多线程怎么实现FTP批量上传文件”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网行业资讯频道。