snakemake.remote.FTP 源代码

__author__ = "Christopher Tomkins-Tinch"
__copyright__ = "Copyright 2015, Christopher Tomkins-Tinch"
__email__ = "tomkinsc@broadinstitute.org"
__license__ = "MIT"

import os
import re
import ftplib
import collections
from itertools import chain
from contextlib import contextmanager

# module-specific
from snakemake.remote import AbstractRemoteProvider, DomainObject
from snakemake.exceptions import FTPFileException, WorkflowError

try:
    # third-party modules
    import ftputil
    import ftputil.session
except ImportError as e:
    raise WorkflowError("The Python 3 package 'ftputil' " +
        "must be installed to use SFTP remote() file functionality. %s" % e.msg)


[文档]class RemoteProvider(AbstractRemoteProvider): supports_default = True allows_directories = True def __init__(self, *args, keep_local=False, stay_on_remote=False, is_default=False, immediate_close=False, **kwargs): super(RemoteProvider, self).__init__(*args, keep_local=keep_local, stay_on_remote=stay_on_remote, is_default=is_default, **kwargs) self.immediate_close = immediate_close @property def default_protocol(self): """The protocol that is prepended to the path when no protocol is specified.""" return 'ftp://' @property def available_protocols(self): """List of valid protocols for this remote provider.""" return ['ftp://', 'ftps://']
[文档] def remote(self, value, *args, encrypt_data_channel=None, immediate_close=None, **kwargs): if isinstance(value, str): values = [value] elif isinstance(value, collections.Iterable): values = value else: raise TypeError('Invalid type ({}) passed to remote: {}'.format(type(value), value)) for i, file in enumerate(values): match = re.match('^(ftps?)://.+', file) if match: protocol, = match.groups() if protocol == 'ftps' and encrypt_data_channel: raise SyntaxError('encrypt_data_channel=False cannot be used with a ftps:// url') if protocol == 'ftp' and encrypt_data_channel not in [None, False]: raise SyntaxError('encrypt_data_channel=Trie cannot be used with a ftp:// url') else: if encrypt_data_channel: values[i] = 'ftps://' + file else: values[i] = 'ftp://' + file should_close = immediate_close if immediate_close else self.immediate_close values = [super(RemoteProvider, self).remote( value, *args, encrypt_data_channel=encrypt_data_channel, immediate_close=should_close, **kwargs) for value in values] if len(values) == 1: return values[0] else: return values
[文档]class RemoteObject(DomainObject): """ This is a class to interact with an FTP server. """ def __init__(self, *args, keep_local=False, provider=None, encrypt_data_channel=False, immediate_close=False, **kwargs): super(RemoteObject, self).__init__(*args, keep_local=keep_local, provider=provider, **kwargs) self.encrypt_data_channel = encrypt_data_channel self.immediate_close = immediate_close
[文档] def close(self): if hasattr(self, "conn") and isinstance(self.conn, ftputil.FTPHost) and not self.immediate_close: try: self.conn.keep_alive() self.conn.close() except: pass
# === Implementations of abstract class members ===
[文档] @contextmanager #makes this a context manager. after 'yield' is __exit__() def ftpc(self): if (not hasattr(self, "conn") or (hasattr(self, "conn") and not isinstance(self.conn, ftputil.FTPHost))) or self.immediate_close: # if args have been provided to remote(), use them over those given to RemoteProvider() args_to_use = self.provider.args if len(self.args): args_to_use = self.args # use kwargs passed in to remote() to override those given to the RemoteProvider() # default to the host and port given as part of the file, falling back to one specified # as a kwarg to remote() or the RemoteProvider (overriding the latter with the former if both) kwargs_to_use = {} kwargs_to_use["host"] = self.host kwargs_to_use["username"] = None kwargs_to_use["password"] = None kwargs_to_use["port"] = int(self.port) if self.port else 21 kwargs_to_use["encrypt_data_channel"] = self.encrypt_data_channel for k,v in self.provider.kwargs.items(): kwargs_to_use[k] = v for k,v in self.kwargs.items(): kwargs_to_use[k] = v ftp_base_class = ftplib.FTP_TLS if kwargs_to_use["encrypt_data_channel"] else ftplib.FTP ftp_session_factory = ftputil.session.session_factory( base_class=ftp_base_class, port=kwargs_to_use["port"], encrypt_data_channel= kwargs_to_use["encrypt_data_channel"], debug_level=None) conn = ftputil.FTPHost(kwargs_to_use["host"], kwargs_to_use["username"], kwargs_to_use["password"], session_factory=ftp_session_factory) if self.immediate_close: yield conn else: self.conn = conn yield self.conn elif not self.immediate_close: yield self.conn # after returning from the context manager, close the connection if the scope is local if self.immediate_close: try: conn.keep_alive() conn.close() except: pass
[文档] def exists(self): if self._matched_address: with self.ftpc() as ftpc: return ftpc.path.exists(self.remote_path) return False else: raise FTPFileException("The file cannot be parsed as an FTP path in form 'host:port/abs/path/to/file': %s" % self.local_file())
[文档] def mtime(self): if self.exists(): with self.ftpc() as ftpc: try: # requires write access ftpc.synchronize_times() except: pass return ftpc.path.getmtime(self.remote_path) else: raise FTPFileException("The file does not seem to exist remotely: %s" % self.local_file())
[文档] def size(self): if self.exists(): with self.ftpc() as ftpc: return ftpc.path.getsize(self.remote_path) else: return self._iofile.size_local
[文档] def download(self, make_dest_dirs=True): with self.ftpc() as ftpc: if self.exists(): # if the destination path does not exist if make_dest_dirs: os.makedirs(os.path.dirname(self.local_path), exist_ok=True) try: # requires write access ftpc.synchronize_times() except: pass ftpc.download(source=self.remote_path, target=self.local_path) os.sync() # ensure flush to disk else: raise FTPFileException("The file does not seem to exist remotely: %s" % self.local_file())
[文档] def upload(self): with self.ftpc() as ftpc: ftpc.synchronize_times() ftpc.upload(source=self.local_path, target=self.remote_path)
@property def list(self): file_list = [] first_wildcard = self._iofile.constant_prefix() dirname = first_wildcard.replace(self.path_prefix, "") with self.ftpc() as ftpc: file_list = [(os.path.join(dirpath, f) if dirpath != "." else f) for dirpath, dirnames, filenames in ftpc.walk(dirname) for f in chain(filenames, dirnames)] file_list = [file_path[1:] if file_path[0] == "/" else file_path for file_path in file_list] return file_list