Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

local_rescan.py

# ubuntuone.syncdaemon.local_rescan - local rescanning
#
# Author: Facundo Batista <facundo@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
'''Module that implements the Local Rescan.'''

import os
import functools
import logging
import errno

from twisted.internet import defer, reactor

00027 class ScanTransactionDirty(Exception):
    '''The transaction was dirty.'''

00030 class ScanNoDirectory(Exception):
    '''The whole directory went away.'''

# local rescan logger
lr_logger = logging.getLogger('ubuntuone.SyncDaemon.local_rescan')
log_info = functools.partial(lr_logger.log, logging.INFO)
log_debug = functools.partial(lr_logger.log, logging.DEBUG)
log_error = functools.partial(lr_logger.log, logging.ERROR)
log_warning = functools.partial(lr_logger.log, logging.WARNING)

00040 class LocalRescan(object):
    '''Local re-scanner.

    Compares the real disc with FSM's metadata, and pushes the changes to EQ.
    '''
    def __init__(self, vm, fsm, eq):
        self.vm = vm
        self.fsm = fsm
        self.eq = eq
        self._queue = []
        self._previous_deferred = None

00052     def start(self):
        '''Start the comparison.'''
        log_info("start scan all shares")
        to_scan = self._get_shares()
        for share in to_scan:
            all_share_dirs = self._get_share_dirs(share.id)
            self._queue.insert(0, (all_share_dirs, share.path, share.path))
        d = self._queue_scan()
        return d

00062     def _get_shares(self, access_level="Modify"):
        '''Get all the shares to compare.'''
        for sid in self.vm.shares:
            share = self.vm.shares[sid]
            if share.access_level == access_level:
                yield share

00069     def scan_dir(self, direct):
        '''Compares one directory between metadata and disk.'''
        log_info("scan dir: %r", direct)

        # get the share to get only a subset of mdids
        for share in self._get_shares():
            if direct.startswith(share.path):
                break
        else:
            # not in RW shares; let's check RO shares, otherwise it's an error
            for share in self._get_shares("View"):
                if direct.startswith(share.path):
                    return
            log_error("The received path is not in any share!")
            raise ValueError("The received path is not in any share!")

        if not os.path.exists(direct):
            m = "The path is not in disk: %r" % direct
            log_warning(m)
            return

        if not os.path.isdir(direct):
            m = "The path is in disk but it's not a dir: %r" % direct
            log_error(m)
            raise ValueError("m")

        # No, 'share' is surely defined; pylint: disable-msg=W0631
        all_share_dirs = self._get_share_dirs(share.id)
        self._queue.insert(0, (all_share_dirs, share.path, direct))
        return self._queue_scan()

00100     def _queue_scan(self):
        '''If there's a scan in progress, queue the new one for later.'''
        if self._previous_deferred is None:
            self._previous_deferred = defer.Deferred()
            self._process_next_queue(None)
        return self._previous_deferred

00107     def _process_next_queue(self, _):
        '''Process the next item in the queue, if any.'''
        log_debug("process next in queue (len %d)", len(self._queue))
        if not self._queue:
            self._previous_deferred.callback(None)
            self._previous_deferred = None
            return

        # more to scan
        scan_info = self._queue.pop()

        def safe_scan():
            try:
                self._scan_tree(*scan_info)
            except Exception, e:
                self._previous_deferred.errback(e)

        reactor.callLater(0, safe_scan)

00126     def _get_share_dirs(self, share_id):
        '''Get all the directories in a share.'''
        all_share_dirs = []
        for obj in self.fsm.get_mdobjs_by_share_id(share_id):
            changd = self.fsm.changed(mdid=obj.mdid)
            all_share_dirs.append(
                    (obj.path, obj.is_dir, obj.stat, changd, obj.node_id))
        return all_share_dirs

00135     def _scan_tree(self, all_share_dirs, share_path, path):
        '''Scans a whole tree, using the received path as root.'''
        log_debug("_scan_tree:  share_path: %r  path: %r", share_path, path)

        def get_start_info():
            '''Gathers the info to start.'''
            # start the process
            return (all_share_dirs, share_path)

        def go_deeper(newdirs):
            '''Explore into the subdirs.'''
            for direct in newdirs:
                log_debug("explore subdir: %r", direct)
                self._queue.insert(0, (all_share_dirs, share_path, direct))

        def re_launch(failure):
            '''Explore that directory again.'''
            if failure.check(ScanTransactionDirty):
                reason = failure.getErrorMessage()
                log_debug("re queue, transaction dirty for %r, reason: %s",
                                                                  path, reason)
                self._queue.insert(0, (all_share_dirs, share_path, path))
            elif failure.check(ScanNoDirectory):
                log_error("the directory dissappeared: %s (%s)", failure.type,
                                                                failure.value)
            else:
                log_error("in the scan: %s (%s)\n%s",
                          failure.type, failure.value, failure.getTraceback())
                return failure

        d = defer.succeed((all_share_dirs, share_path, path))
        d.addCallbacks(self._scan_one_dir)
        d.addCallbacks(go_deeper, re_launch)
        d.addCallback(self._process_next_queue)
        return d

00171     def _compare(self, dirpath, dirnames, filenames, all_share_dirs, shr_path):
        '''Compare the directories with the info that should be there.'''
        log_debug("comparing directory %r", dirpath)

        # get the share info
        shouldbe = self._paths_filter(all_share_dirs, dirpath, len(shr_path))

        def despair(message, fullname, also_children=False, also_remove=None):
            '''Something went very bad with this node, converge!'''
            log_debug(message, fullname)
            try:
                os.rename(fullname, fullname + ".conflict")
            except OSError, e:
                m = "OSError %s when trying to move to conflict file/dir %r"
                log_warning(m, e, fullname)
            self.fsm.delete_metadata(fullname)

            # if asked, remove metadata por children
            if also_children:
                log_debug("Removing also metadata from %r children", fullname)
                for path, is_dir in self.fsm.get_paths_starting_with(fullname):
                    self.fsm.delete_metadata(path)

            # if asked, remove also that file
            if also_remove is not None:
                try:
                    os.remove(also_remove)
                except OSError, e:
                    m = "OSError %s when trying to remove file %r"
                    log_warning(m, e, fullname)

        def check_stat(fullname, statinfo):
            '''Generate event if stats differ.'''
            log_debug("comp yield STAT prv: %s", statinfo)
            newstat = os.stat(fullname)
            log_debug("comp yield STAT new: %s", newstat)
            if statinfo != newstat:
                events.append(('FS_FILE_CLOSE_WRITE', fullname))

        # check all directories
        to_scan_later = []
        events = []
        for dname in dirnames:
            fullname = os.path.join(dirpath, dname)
            if dname in shouldbe:
                is_dir, statinfo, changed = shouldbe.pop(dname)
                if not is_dir:
                    # it's there, but it's a file!
                    log_debug("comp yield: file %r became a dir!", fullname)
                    events.append(('FS_FILE_DELETE', fullname))
                    events.append(('FS_DIR_CREATE', fullname))
                else:
                    if changed == "SERVER":
                        # download interrupted
                        log_debug("comp yield: dir %r in SERVER", fullname)
                        mdobj = self.fsm.get_by_path(fullname)
                        self.fsm.set_by_mdid(mdobj.mdid,
                                             server_hash=mdobj.local_hash)
                        self.fsm.remove_partial(mdobj.node_id, mdobj.share_id)
                        to_scan_later.append(fullname)
                    elif changed == "NONE":
                        # it's old, we should scan it later
                        log_debug("comp yield: dir %r will be scaned later!",
                                                                      fullname)
                        to_scan_later.append(fullname)
                    else:
                        m = "Wrong 'changed' value for %r: " + changed
                        despair(m, fullname, also_children=True)

            else:
                # hey, it's new!
                log_debug("comp yield: directory %r is new!", fullname)
                events.append(('FS_DIR_CREATE', fullname))

        # check all files
        for fname in filenames:
            fullname = os.path.join(dirpath, fname)
            if fname in shouldbe:
                is_dir, statinfo, changed = shouldbe.pop(fname)
                if is_dir:
                    log_debug("comp yield: dir %r became a file!", fullname)
                    # it's there, but it's a directory!
                    events.append(('FS_DIR_DELETE', fullname))
                    events.append(('FS_FILE_CREATE', fullname))
                else:
                    if changed == "LOCAL":
                        # upload interrupted
                        log_debug("comp yield: file %r in LOCAL state!",
                                                                    fullname)
                        events.append(('FS_FILE_CLOSE_WRITE', fullname))
                    elif changed == "NONE":
                        # what about stat info?
                        log_debug("comp yield: file %r was here.. stat?",
                                                                    fullname)
                        check_stat(fullname, statinfo)
                    elif changed == "SERVER":
                        log_debug("comp yield: file %r in SERVER", fullname)
                        mdobj = self.fsm.get_by_path(fullname)
                        self.fsm.set_by_mdid(mdobj.mdid,
                                             server_hash=mdobj.local_hash)
                        self.fsm.remove_partial(mdobj.node_id, mdobj.share_id)
                        check_stat(fullname, statinfo)
                    else:
                        m = "Wrong 'changed' value for %r: " + changed
                        despair(m, fullname)

            else:
                if fname.endswith(".partial"):
                    # a partial file! it can be a standard file, or the one
                    # inside a dir (which will be deleted in that case)
                    realname = fname[:-8]
                    realfullname = fullname[:-8]
                    if realname not in shouldbe:
                        # this is the case of a .partial with no md at all!
                        m = "Found a .partial (%r) with no metadata, removing!"
                        log_debug(m, fullname)
                        os.remove(fullname)
                        continue

                    is_dir, statinfo, changed = shouldbe.pop(realname)
                    if is_dir:
                        m = ".partial of a file that MD says it's a dir: %r"
                        despair(m, realfullname, also_remove=fullname)
                    elif changed != "SERVER":
                        m = ".partial of a file that 'changed' != SERVER: %r"
                        despair(m, realfullname, also_remove=fullname)
                    else:
                        # download interrupted
                        m = "comp yield: file %r in SERVER state!"
                        log_debug(m, fullname)
                        mdobj = self.fsm.get_by_path(realfullname)
                        self.fsm.set_by_mdid(mdobj.mdid,
                                             server_hash=mdobj.local_hash)
                        self.fsm.remove_partial(mdobj.node_id, mdobj.share_id)
                        check_stat(fullname, statinfo)

                else:
                    # hey, it's new!
                    log_debug("comp yield: file %r is new!", fullname)
                    events.append(('FS_FILE_CREATE', fullname))

                    # if it's not empty, tell to hash it and uplod
                    if os.path.getsize(fullname):
                        events.append(('FS_FILE_CLOSE_WRITE', fullname))


        # all these don't exist anymore
        for name, (is_dir, statinfo, changed) in shouldbe.iteritems():
            fullname = os.path.join(dirpath, name)
            if is_dir:
                if changed not in ("SERVER", "NONE"):
                    # bad metadata
                    m = "Bad 'changed': removing MD from dir %r and children"
                    log_debug(m, fullname)
                    children = self.fsm.get_paths_starting_with(fullname)
                    for path, is_dir in children:
                        self.fsm.delete_metadata(path)
                    continue

                log_debug("comp yield: directory %r is gone!", fullname)
                # it's a directory, didn't have any info inside?
                base_path = fullname[len(shr_path)+1:]
                to_inform = []

                # get all the info inside that dir
                for shrpath, is_dir, statinfo, _, _ in all_share_dirs:
                    if shrpath.startswith(base_path):
                        qparts = len(shrpath.split(os.path.sep))
                        to_inform.append((qparts, shrpath, is_dir))

                # order everything from more path components to less (this
                # will assure correct upgoing walk in the tree)
                to_inform.sort(reverse=True)

                # inform deletion!
                for (_, name, is_dir) in to_inform:
                    fullname = os.path.join(shr_path, name)
                    if is_dir:
                        events.append(('FS_DIR_DELETE', fullname))
                    else:
                        events.append(('FS_FILE_DELETE', fullname))
            else:
                if changed not in ("SERVER", "NONE", "LOCAL"):
                    # bad metadata
                    m = "Bad 'changed': removing MD from file %r"
                    log_debug(m, fullname)
                    self.fsm.delete_metadata(fullname)
                    continue

                log_debug("comp yield: file %r is gone!", fullname)
                events.append(('FS_FILE_DELETE', fullname))
        return events, to_scan_later

00364     def _paths_filter(self, all_share_dirs, dirpath, len_shr_path):
        '''Returns the paths that belong to this dir.'''
        # paths in shares are relative, remove the first slash
        direct = dirpath[len_shr_path + 1:]
        basedir = dirpath[:len_shr_path]

        # build the dict
        filesdirs = {}
        for shrpath, is_dir, statinfo, changed, node_id in all_share_dirs:
            base, fname = os.path.split(shrpath)
            if base == direct:
                # if without node_id, remove the metadata, and take it as new
                if node_id is None:
                    fullname = os.path.join(basedir, shrpath)
                    m = "Deleting metadata, because of node_id=None, of %r"
                    log_debug(m, fullname)
                    self.fsm.delete_metadata(fullname)
                    continue

                filesdirs[fname] = is_dir, statinfo, changed
        return filesdirs

00386     def _scan_one_dir(self, scan_info):
        '''Gets one dir and compares with fsm.'''
        all_share_dirs, shr_path, dirpath = scan_info

        log_debug("Adding watch to %r", dirpath)
        self.eq.inotify_add_watch(dirpath)

        to_later = []
        self.eq.freeze_begin(dirpath)

        def scan():
            '''the scan, really'''
            log_debug("scanning the dir %r", dirpath)
            try:
                listdir = os.listdir(dirpath)
            except OSError, e:
                if e.errno == errno.ENOENT:
                    self.eq.freeze_rollback()
                    raise ScanNoDirectory("Directory %r disappeared since last"
                                          " time" % dirpath)
                raise

            # don't support symlinks yet
            no_link = lambda p: not os.path.islink(os.path.join(dirpath, p))
            listdir = filter(no_link, listdir)

            # get the info from disk
            dnames = []
            fnames = []
            for something in listdir:
                fullname = os.path.join(dirpath, something)
                if os.path.isdir(fullname):
                    dnames.append(something)
                else:
                    fnames.append(something)

            try:
                events, to_scan_later = self._compare(dirpath, dnames, fnames,
                                                      all_share_dirs, shr_path)
                to_later.extend(to_scan_later)
            except OSError, e:
                if e.errno == errno.ENOENT:
                    # something dissapeared from disk, start all over again
                    self.eq.freeze_rollback()
                    raise ScanTransactionDirty("The file/dir %r dissapeared" %
                                                                    e.filename)
                raise

            return events

        def control(dirty):
            '''controls that everything was ok'''
            if dirty:
                self.eq.freeze_rollback()
                raise ScanTransactionDirty("dirty!")
            else:
                return to_later

        d = defer.execute(scan)
        d.addCallback(self.eq.freeze_commit)
        d.addCallback(control)
        return d

Generated by  Doxygen 1.6.0   Back to index