Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

sync.py

# ubuntuone.syncdaemon.sync - sync module
#
# Author: Lucio Torre <lucio.torre@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
"""This is the magic"""
from __future__ import with_statement

import os
import logging
from cStringIO import StringIO
import sys

from ubuntuone.syncdaemon.marker import MDMarker
from ubuntuone.storageprotocol.dircontent_pb2 import DIRECTORY
from ubuntuone.storageprotocol import dircontent, hash
from ubuntuone.syncdaemon.fsm.fsm import \
    StateMachineRunner, StateMachine
from ubuntuone.syncdaemon.interfaces import IMarker
from ubuntuone.syncdaemon import u1fsfsm
from ubuntuone.syncdaemon.filesystem_manager import \
    InconsistencyError
empty_hash = hash.content_hash_factory().content_hash()

00037 class FSKey(object):
    """This encapsulates the problem of getting the metadata with different
    keys."""

00041     def __init__(self, fs, **keys):
        """create"""
        self.fs = fs
        self.keys = keys

00046     def get_mdid(self):
        """Get the metadata id."""
        if len(self.keys) == 1 and "path" in self.keys:
            mdid = self.fs._idx_path[self.keys["path"]]
        elif len(self.keys) == 1 and "mdid" in self.keys:
            mdid = self.keys["mdid"]
        elif len(self.keys) == 2 and "node_id" in self.keys \
                    and "share_id" in self.keys:
            mdid = self.fs._idx_node_id[self.keys["share_id"],
                                                        self.keys["node_id"]]
        else:
            raise KeyError("Incorrect keys: %s" % self.keys)
        if mdid is None:
            raise KeyError("cant find mdid")
        return mdid

00062     def get(self, key):
        """Get the value for key."""
        mdid = self.get_mdid()
        if key == 'path':
            mdobj = self.fs.get_by_mdid(mdid)
            return self.fs.get_abspath(mdobj.share_id, mdobj.path)
        elif key == 'node_id':
            mdobj = self.fs.get_by_mdid(mdid)
            if mdobj.node_id is None:
                return MDMarker(mdid)
            else:
                return mdobj.node_id
        elif key == 'parent_id':
            mdobj = self.fs.get_by_mdid(mdid)
            path = self.fs.get_abspath(mdobj.share_id, mdobj.path)
            parent_path = os.path.dirname(path)
            parent = self.fs.get_by_path(parent_path)
            return parent.node_id or MDMarker(parent.mdid)
        else:
            return getattr(self.fs.get_by_mdid(mdid), key, None)

00083     def __getitem__(self, key):
        """Get the value for key."""
        return self.get(key)

00087     def set(self, **kwargs):
        """Set the values for kwargs."""
        mdid = self.get_mdid()
        self.fs.set_by_mdid(mdid, **kwargs)

00092     def has_metadata(self):
        """The State Machine value version of has_metadata."""
        try:
            return str(self.fs.has_metadata(**self.keys))[0]
        except KeyError, TypeError:
            return 'NA'

00099     def is_dir(self):
        """The State Machine value version of is_dir."""
        try:
            return str(self.fs.is_dir(**self.keys))[0]
        except KeyError:
            return 'NA'

00106     def changed(self):
        """The State Machine value version of changed."""
        try:
            return self.fs.changed(**self.keys)
        except KeyError:
            return 'NA'

00113     def open_file(self):
        """get the file object for reading"""
        mdid = self.get_mdid()
        try:
            fo = self.fs.open_file(mdid)
        except IOError:
            # this is a HUGE cheat
            # the state expectes to start a download
            # but the file is gone. so to keep the transitions correct
            # we return an empty file. we will later receive the FS_DELETE
            return StringIO()
        return fo

00126     def upload_finished(self, server_hash):
        """signal that we have uploaded the file"""
        mdid = self.get_mdid()
        self.fs.upload_finished(mdid, server_hash)

00131     def delete_file(self):
        """delete the file and metadata"""
        path = self["path"]
        self.fs.delete_file(path)

00136     def delete_metadata(self):
        """delete the metadata"""
        path = self["path"]
        self.fs.delete_metadata(path)

00141     def move_file(self, new_share_id, new_parent_id, new_name):
        """get the stuff we need to move the file."""
        source_path = self['path']
        parent_path = self.fs.get_by_node_id(new_share_id, new_parent_id).path
        dest_path = os.path.join(
            self.fs.get_abspath(new_share_id, parent_path),
            new_name)
        self.fs.move_file(new_share_id, source_path, dest_path)

00150     def moved(self, new_share_id, path_to):
        """change the metadata of a moved file."""
        self.fs.moved(new_share_id, self['path'], path_to)
        if "path" in self.keys:
            self.keys["path"] = path_to

00156     def remove_partial(self):
        """remove a partial file"""
        # pylint: disable-msg=W0704
        try:
            self.fs.remove_partial(self["node_id"], self["share_id"])
        except ValueError:
            # we had no partial, ignore
            pass

00165     def move_to_conflict(self):
        """Move file to conflict"""
        self.fs.move_to_conflict(self.get_mdid())

00169     def refresh_stat(self):
        """refresh the stat"""
        path = self["path"]
        # pylint: disable-msg=W0704
        try:
            self.fs.refresh_stat(path)
        except OSError:
            # no file to stat, nothing to do
            pass

00179     def safe_get(self, key, default='^_^'):
        """ safe version of self.get, to be used in the FileLogger. """
        # catch all errors as we are here to help logging
        # pylint: disable-msg=W0703
        try:
            return self.get(key)
        except Exception:
            return default

    def make_file(self):
        "create the local empty file"
        self.fs.create_file(self.get_mdid())




def loglevel(lvl):
    """Make a function that logs at lvl log level."""
    def level_log(self, message, *args, **kwargs):
        """inner."""
        self.log(lvl, message, *args, **kwargs)
    return level_log


00203 class FileLogger(object):
    """A logger that knows about the file and its state."""

00206     def __init__(self, logger, key):
        """Create a logger for this guy"""
        self.logger = logger
        self.key = key

00211     def log(self, lvl, message, *args, **kwargs):
        """Log."""

        format = "%(hasmd)s:%(changed)s:%(isdir)s %(mdid)s "\
                 "[%(share_id)s:%(node_id)s] '%(path)r' | %(message)s"
        exc_info = sys.exc_info
        if self.key.has_metadata() == "T":
            # catch all errors as we are logging, pylint: disable-msg=W0703
            try:
                base = os.path.split(self.key.fs._get_share(
                    self.key['share_id']).path)[1]
                path = os.path.join(base, self.key.fs._share_relative_path(
                    self.key['share_id'], self.key['path']))
            except Exception:
                # error while getting the path
                self.logger.exception("Error in logger while building the "
                                      "relpath of: %r", self.key['path'])
                path = self.key.safe_get('path')
            extra = dict(message=message,
                         mdid=self.key.safe_get("mdid"),
                         path=path,
                         share_id=self.key.safe_get("share_id") or 'root',
                         node_id=self.key.safe_get("node_id"),
                         hasmd=self.key.has_metadata(),
                         isdir=self.key.is_dir(),
                         changed=self.key.changed())
        else:
            extra = dict(message=message, mdid="-",
                         path="-",
                         share_id="-",
                         node_id="-",
                         hasmd="-",
                         isdir="-",
                         changed="-")
            extra.update(self.key.keys)
        message = format % extra
        if lvl == -1:
            kwargs.update({'exc_info':exc_info})
            self.logger.error(message, *args, **kwargs)
        else:
            self.logger.log(lvl, message, *args, **kwargs)

    critical = loglevel(logging.CRITICAL)
    error = loglevel(logging.ERROR)
    warning = loglevel(logging.WARNING)
    info = loglevel(logging.INFO)
    debug = loglevel(logging.DEBUG)
    exception = loglevel(-1)

00260 class SyncStateMachineRunner(StateMachineRunner):
    """This is where all the state machine methods are."""

00263     def __init__(self, fsm, main, key, logger=None):
        """Create the runner."""
        super(SyncStateMachineRunner, self).__init__(fsm, logger)
        self.m = main
        self.key = key

00269     def signal_event_with_hash(self, event, hash, *args):
        """An event that takes a hash ocurred, build the params and signal."""
        self.on_event(event, self.build_hash_eq(hash), hash, *args)

00273     def build_hash_eq(self, hash):
        """Build the event params."""
        try:
            sh = str(self.key["server_hash"] == hash)[0]
            lh = str(self.key["local_hash"] == hash)[0]
        except KeyError:
            sh = lh = "NA"
        return dict(hash_eq_server_hash=sh, hash_eq_local_hash=lh)

00282     def signal_event_with_error_and_hash(self, event, error, hash, *args):
        """An event that takes a hash ocurred, build the params and signal."""
        params = self.build_error_eq(error)
        params.update(self.build_hash_eq(hash))
        self.on_event(event, params, error, hash, *args)

00288     def signal_event_with_error(self, event, error, *args):
        """An event returned with error."""
        params = self.build_error_eq(error)
        self.on_event(event, params, error, *args)

00293     def build_error_eq(self, error):
        """Get the error state."""
        return dict(not_available="F", not_authorized="F")

00297     def get_state_values(self):
        """Get the values for the current state."""
        return dict(
            has_metadata=self.key.has_metadata(),
            changed=self.key.changed(),
            is_directory=self.key.is_dir(),
        )

    # EVENT HANDLERS

00307     def nothing(self, event, params, *args):
        """pass"""
        pass

00311     def new_dir(self, event, params, share_id, node_id, parent_id, name):
        """create a local file."""
        mdobj = self.m.fs.get_by_node_id(share_id, parent_id)
        path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name)
        self.m.fs.create(path=path, share_id=share_id, is_dir=True)
        self.m.fs.set_node_id(path, node_id)
        self.m.action_q.query([(share_id, node_id, "")])
        # pylint: disable-msg=W0704
        # this should be provided by FSM, fix!!
        try:
            with self.m.fs._enable_share_write(share_id, self.key['path']):
                os.mkdir(self.key['path'])
        except OSError, e:
            if not e.errno == 17: #already exists
                raise
        else:
            try:
                # just add the watch
                # we hope the user wont have time to add a file just after
                # *we* created the directory
                # this is until we can solve and issue with LR and
                # new dirs and fast downloads
                # see bug #373940
                self.m.event_q.inotify_add_watch(path)
                #self.m.lr.scan_dir(path)
            except ValueError:
                pass #it was gone when lr got it


00340     def new_dir_on_server_with_local(self, event, params, share_id,
                                     node_id, parent_id, name):
        """new dir on server and we have a local file."""
        self.key.move_to_conflict()
        self.key.delete_metadata()
        self.new_dir(event, params, share_id, node_id, parent_id, name)

00347     def reget_dir(self, event, params, hash):
        """Reget the directory."""
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.remove_partial()
        self.m.action_q.query([(self.key['share_id'],
                                self.key['node_id'],
                                self.key['local_hash'] or "")])
        self.key.set(server_hash=self.key['local_hash'])

00357     def get_dir(self, event, params, hash):
        """Get the directory."""
        self.key.set(server_hash=hash)
        self.m.fs.create_partial(node_id=self.key['node_id'],
                                 share_id=self.key['share_id'])
        self.m.action_q.listdir(
            self.key['share_id'], self.key['node_id'], hash,
            lambda : self.m.fs.get_partial_for_writing(
                node_id=self.key['node_id'],
                share_id=self.key['share_id'])
            )

00369     def file_conflict(self, event, params, hash, crc32, size, stat):
        """This file is in conflict."""
        self.key.move_to_conflict()
        self.key.make_file()

00374     def local_file_conflict(self, event, params, hash):
        """This file is in conflict."""
        self.key.move_to_conflict()
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.make_file()
        self.get_file(event, params, hash)

00382     def merge_directory(self, event, params, hash):
        """Merge the server directory with the local one."""
        new_files = []
        new_dirs = []
        deleted_files = []
        deleted_dirs = []
        moved = set()

        try:
            fd = self.m.fs.get_partial(node_id=self.key['node_id'],
                            share_id=self.key['share_id'])
        except InconsistencyError:
            self.key.remove_partial()
            self.key.set(server_hash=self.key['local_hash'])
            self.m.action_q.query([
                (self.key["share_id"], self.key["node_id"], "")])
            # we dont perform the merge, we try to re get it
            return


        items = dircontent.parse_dir_content(fd)
        server_dir = [ (o.utf8_name, o.node_type == DIRECTORY, o.uuid)
                        for o in items ]
        client_dir = self.m.fs.dir_content(self.key['path'])
        # XXX: lucio.torre: with huge dirs, this could take a while

        share = self.key['share_id']
        for name, isdir, uuid in server_dir:
            # we took the name as bytes already encoded in utf8
            # directly from dircontent!
            try:
                md = self.m.fs.get_by_node_id(share, uuid)
            except KeyError:
                # not there, a new thing
                if isdir:
                    new_dirs.append((share, uuid, name))
                else:
                    new_files.append((share, uuid, name))
                continue
            mdpath = self.m.fs.get_abspath(md.share_id, md.path)
            if mdpath != os.path.join(self.key['path'], name):
                # this was moved
                # mark as moved
                moved.add(uuid)
                # signal moved
                self.m.event_q.push("SV_MOVED",
                    share_id=md.share_id, node_id=uuid,
                    new_share_id=share, new_parent_id=self.key['node_id'],
                    new_name=name)


        for name, isdir, uuid in client_dir:
            if uuid is None:
                continue

            if not (name, isdir, uuid) in server_dir:
                # not there, a its gone on the server
                if uuid in moved:
                    # this was a move, dont delete
                    continue
                if isdir:
                    deleted_dirs.append((share, uuid))
                else:
                    deleted_files.append((share, uuid))


        parent_uuid = self.key['node_id']
        for share, uuid, name in new_files:
            self.m.event_q.push("SV_FILE_NEW", parent_id=parent_uuid,
                                node_id=uuid, share_id=share, name=name)
        for share, uuid, name in new_dirs:
            self.m.event_q.push("SV_DIR_NEW", parent_id=parent_uuid,
                                node_id=uuid, share_id=share, name=name)
        for share, uuid in deleted_files:
            self.m.event_q.push("SV_FILE_DELETED",
                                node_id=uuid, share_id=share)
        for share, uuid in deleted_dirs:
            self.m.event_q.push("SV_FILE_DELETED",
                                node_id=uuid, share_id=share)

        self.key.remove_partial()
        self.key.set(local_hash=hash)

00465     def new_file(self, event, params, share_id, node_id, parent_id, name):
        """create a local file."""
        mdobj = self.m.fs.get_by_node_id(share_id, parent_id)
        path = os.path.join(self.m.fs.get_abspath(share_id, mdobj.path), name)
        self.m.fs.create(path=path, share_id=share_id, is_dir=False)
        self.m.fs.set_node_id(path, node_id)
        self.key.set(server_hash="")
        self.key.set(local_hash="")
        self.key.make_file()
        self.m.action_q.query([(share_id, node_id, "")])

00476     def new_file_on_server_with_local(self, event, params, share_id,
                                      node_id, parent_id, name):
        """move local file to conflict and re create"""
        self.key.move_to_conflict()
        self.key.delete_metadata()
        self.new_file(event, params, share_id, node_id, parent_id, name)

00483     def get_file(self, event, params, hash):
        """Get the contents for the file."""
        self.key.set(server_hash=hash)
        self.m.fs.create_partial(node_id=self.key['node_id'],
                                 share_id=self.key['share_id'])
        self.m.action_q.download(
            share_id=self.key['share_id'], node_id=self.key['node_id'],
            server_hash=hash,
            fileobj_factory=lambda: self.m.fs.get_partial_for_writing(
                node_id=self.key['node_id'],
                share_id=self.key['share_id'])
            )

00496     def reget_file(self, event, params, hash):
        """cancel and reget this download."""
        self.key.set(server_hash=hash)
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.remove_partial()
        self.get_file(event, params, hash)

00504     def client_moved(self, event, params, path_from, path_to):
        """the client moved a file"""
        parent_path = os.path.dirname(path_from)
        old_parent = FSKey(self.m.fs, path=parent_path)
        old_parent_id = old_parent['node_id']
        new_path = os.path.dirname(path_to)
        new_name = os.path.basename(path_to)
        new_parent = FSKey(self.m.fs, path=new_path)
        new_parent_id = new_parent['node_id']

        self.m.action_q.move(share_id=self.key['share_id'],
            node_id=self.key['node_id'], old_parent_id=old_parent_id,
            new_parent_id=new_parent_id, new_name=new_name)
        self.key.moved(self.key['share_id'], path_to)

        # this is cheating, we change the state of another node
        if not IMarker.providedBy(old_parent_id):
            share_id = self.key['share_id']
            self.m.action_q.cancel_download(share_id, old_parent_id)
            old_parent.remove_partial()
            self.m.fs.set_by_node_id(old_parent_id, share_id,
                                     server_hash="", local_hash="")
            self.m.action_q.query([(share_id, old_parent_id, "")])
        self.m.hash_q.insert(self.key['path'])


00530     def server_file_changed_back(self, event, params, hash):
        """cancel and dont reget this download."""
        self.key.set(server_hash=hash)
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.remove_partial()

00537     def commit_file(self, event, params, hash):
        """commit the new content."""
        try:
            self.m.fs.commit_partial(
                            self.key['node_id'], self.key['share_id'], hash)
        except InconsistencyError:
            # someone or something broke out partials.
            # start work to go to a good state
            self.key.remove_partial()
            self.key.set(server_hash=self.key['local_hash'])
            self.m.action_q.query([
                (self.key["share_id"], self.key["node_id"], "")])

00550     def new_local_file(self, event, parms, path):
        """a new local file was created"""
        # XXX: lucio.torre: we should use markers here
        parent_path = os.path.dirname(path)
        parent = self.m.fs.get_by_path(parent_path)
        parent_id = parent.node_id or MDMarker(parent.mdid)
        share_id = parent.share_id
        self.m.fs.create(path=path, share_id=share_id, is_dir=False)
        self.key.set(local_hash=empty_hash)
        self.key.set(server_hash=empty_hash)
        name = os.path.basename(path)
        marker = MDMarker(self.key.get_mdid())
        self.m.action_q.make_file(share_id, parent_id, name, marker)

00564     def new_local_file_created(self, event, parms, new_id):
        """we got the server answer for the file creation."""
        self.m.fs.set_node_id(self.key['path'], new_id)


00569     def new_local_dir(self, event, parms, path):
        """a new local dir was created"""
        # XXX: lucio.torre: we should use markers here
        parent_path = os.path.dirname(path)
        parent = self.m.fs.get_by_path(parent_path)
        parent_id = parent.node_id or MDMarker(parent.mdid)
        share_id = parent.share_id
        self.m.fs.create(path=path, share_id=share_id, is_dir=True)
        name = os.path.basename(path)
        marker = MDMarker(self.key.get_mdid())
        self.m.action_q.make_dir(share_id, parent_id, name, marker)
        # pylint: disable-msg=W0704
        try:
            self.m.lr.scan_dir(path)
        except ValueError:
            pass #it was gone when lr got it

00586     def new_local_dir_created(self, event, parms, new_id):
        """we got the server answer for dir creation"""
        self.m.fs.set_node_id(self.key['path'], new_id)

00590     def calculate_hash(self, event, params):
        """calculate the hash of this."""
        self.m.hash_q.insert(self.key['path'])

00594     def put_file(self, event, params, hash, crc32, size, stat):
        """upload the file to the server."""
        previous_hash = self.key['server_hash']
        self.key.set(local_hash=hash)
        self.m.fs.update_stat(self.key.get_mdid(), stat)
        self.m.action_q.upload(share_id=self.key['share_id'],
            node_id=self.key['node_id'], previous_hash=previous_hash,
            hash=hash, crc32=crc32, size=size,
            fileobj_factory=self.key.open_file)

00604     def converges_to_server(self, event, params, hash, crc32, size, stat):
        """the local changes now match the server"""
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                    node_id=self.key['node_id'])
        self.key.remove_partial()
        self.key.set(local_hash=hash)
        self.m.fs.update_stat(self.key.get_mdid(), stat)

00612     def reput_file_from_ok(self, event, param, hash):
        """put the file again, mark upload as ok"""
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.set(local_hash=hash)
        self.key.set(server_hash=hash)
        self.m.hash_q.insert(self.key['path'])


00621     def reput_file(self, event, param, hash, crc32, size, stat):
        """put the file again."""
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        previous_hash = self.key['server_hash']

        self.key.set(local_hash=hash)
        self.m.fs.update_stat(self.key.get_mdid(), stat)
        self.m.action_q.upload(share_id=self.key['share_id'],
            node_id=self.key['node_id'], previous_hash=previous_hash,
            hash=hash, crc32=crc32, size=size,
            fileobj_factory=self.key.open_file)

00634     def server_file_now_matches(self, event, params, hash):
        """We got a server hash that matches local hash"""
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.set(server_hash=hash)

00640     def commit_upload(self, event, params, hash):
        """Finish an upload."""
        self.key.upload_finished(hash)

00644     def cancel_and_commit(self, event, params, hash):
        """Finish an upload."""
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.remove_partial()
        self.key.upload_finished(hash)


00652     def delete_file(self, event, params):
        """server file was deleted."""
        try:
            self.key.delete_file()
        except OSError, e:
            if e.errno == 39:
                # if directory not empty
                self.key.move_to_conflict()
                self.key.delete_metadata()
            elif e.errno == 2:
                # file gone
                pass
            else:
                raise e

00667     def conflict_and_delete(self, event, params, *args, **kwargs):
        """move to conflict and delete file."""
        self.key.move_to_conflict()
        self.key.delete_metadata()

00672     def file_gone_wile_downloading(self, event, params):
        """a file we were downloading is gone."""
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.remove_partial()
        self.delete_file(event, params)

00679     def file_not_created_remove(self, event, params, error):
        """kill it"""
        self.key.move_to_conflict()
        self.key.delete_metadata()

00684     def delete_on_server(self, event, params, path):
        """local file was deleted."""
        self.m.action_q.unlink(self.key['share_id'],
                               self.key['parent_id'],
                               self.key['node_id'])
        self.key.delete_metadata()

00691     def deleted_dir_while_downloading(self, event, params, path):
        """kill it"""
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                                        node_id=self.key['node_id'])
        self.key.remove_partial()
        self.m.action_q.unlink(self.key['share_id'],
                               self.key['parent_id'],
                               self.key['node_id'])
        self.key.delete_metadata()

00701     def cancel_download_and_delete_on_server(self, event, params, path):
        """cancel_download_and_delete_on_server"""
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                                        node_id=self.key['node_id'])
        self.key.remove_partial()
        self.m.action_q.unlink(self.key['share_id'],
                               self.key['parent_id'],
                               self.key['node_id'])
        self.key.delete_metadata()

00711     def cancel_upload_and_delete_on_server(self, event, params, path):
        """cancel_download_and_delete_on_server"""
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
                                      node_id=self.key['node_id'])
        self.m.action_q.unlink(self.key['share_id'],
                               self.key['parent_id'],
                               self.key['node_id'])
        self.key.delete_metadata()


00721     def server_moved(self, event, params, share_id, node_id,
                     new_share_id, new_parent_id, new_name):
        """file was moved on the server"""
        self.key.move_file(new_share_id, new_parent_id, new_name)

00726     def server_moved_dirty(self, event, params, share_id, node_id,
                     new_share_id, new_parent_id, new_name):
        """file was moved on the server while downloading it"""
        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.remove_partial()
        self.key.move_file(new_share_id, new_parent_id, new_name)
        self.get_file(event, params, self.key['server_hash'])

00735     def moved_dirty_local(self, event, params, path_from, path_to):
        """file was moved while uploading it"""
        self.m.action_q.cancel_upload(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.set(local_hash=self.key['server_hash'])
        self.client_moved(event, params, path_from, path_to)
        self.m.hash_q.insert(self.key['path'])


00744     def moved_dirty_server(self, event, params, path_from, path_to):
        """file was moved while downloading it"""
        self.client_moved(event, params, path_from, path_to)

        self.m.action_q.cancel_download(share_id=self.key['share_id'],
                            node_id=self.key['node_id'])
        self.key.remove_partial()
        self.key.set(server_hash=self.key['local_hash'])
        self.m.action_q.query([(self.key['share_id'],
                                self.key['node_id'],
                                self.key['local_hash'] or "")])

00756     def DESPAIR(self, event, params, *args, **kwargs):
        """if we got here, we are in trouble"""
        self.log.error("DESPAIR on event=%s params=%s args=%s kwargs=%s",
                                                event, params, args, kwargs)

    def save_stat(self, event, params, hash, crc32, size, stat):
        self.m.fs.update_stat(self.key.get_mdid(), stat)


00765 class Sync(object):
    """Translates from EQ events into state machine events."""
    # XXX: lucio.torre:
    # this will need some refactoring once we handle more events

00770     def __init__(self, main):
        """create"""
        self.logger = logging.getLogger('ubuntuone.SyncDaemon.sync')
        self.fsm = StateMachine(u1fsfsm.state_machine)
        self.m = main
        self.m.event_q.subscribe(self)

00777     def handle_SV_HASH_NEW(self, share_id, node_id, hash):
        """on SV_HASH_NEW"""
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.signal_event_with_hash("SV_HASH_NEW", hash)

00784     def handle_SV_FILE_NEW(self, share_id, node_id, parent_id, name):
        """on SV_FILE_NEW"""
        parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id)
        path = os.path.join(parent["path"], name)
        key = FSKey(self.m.fs, path=path)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("SV_FILE_NEW", {}, share_id, node_id, parent_id, name)

00793     def handle_SV_DIR_NEW(self, share_id, node_id, parent_id, name):
        """on SV_DIR_NEW"""
        parent = FSKey(self.m.fs, share_id=share_id, node_id=parent_id)
        path = os.path.join(parent["path"], name)
        key = FSKey(self.m.fs, path=path)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("SV_DIR_NEW", {}, share_id, node_id, parent_id, name)

00802     def handle_SV_FILE_DELETED(self, share_id, node_id):
        """on SV_FILE_DELETED"""
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("SV_FILE_DELETED", {})

00809     def handle_AQ_DOWNLOAD_FINISHED(self, share_id, node_id, server_hash):
        """on AQ_DOWNLOAD_FINISHED"""
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.signal_event_with_hash("AQ_DOWNLOAD_FINISHED", server_hash)

00816     def handle_AQ_DOWNLOAD_ERROR(self, *args, **kwargs):
        """on AQ_DOWNLOAD_ERROR"""
        # for now we pass. later we will feed this into the state machine
        pass

00821     def handle_FS_FILE_CREATE(self, path):
        """on FS_FILE_CREATE"""
        key = FSKey(self.m.fs, path=path)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("FS_FILE_CREATE", {}, path)

00828     def handle_FS_DIR_CREATE(self, path):
        """on FS_DIR_CREATE"""
        key = FSKey(self.m.fs, path=path)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("FS_DIR_CREATE", {}, path)

00835     def handle_FS_FILE_DELETE(self, path):
        """on FS_FILE_DELETE"""
        key = FSKey(self.m.fs, path=path)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("FS_FILE_DELETE", {}, path)

00842     def handle_FS_DIR_DELETE(self, path):
        """on FS_DIR_DELETE"""
        key = FSKey(self.m.fs, path=path)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("FS_DIR_DELETE", {}, path)

00849     def handle_FS_FILE_MOVE(self, path_from, path_to):
        """on FS_FILE_MOVE"""
        key = FSKey(self.m.fs, path=path_from)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("FS_FILE_MOVE", {}, path_from, path_to)
    handle_FS_DIR_MOVE = handle_FS_FILE_MOVE

00857     def handle_AQ_FILE_NEW_OK(self, marker, new_id):
        """on AQ_FILE_NEW_OK"""
        key = FSKey(self.m.fs, mdid=marker)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("AQ_FILE_NEW_OK", {}, new_id)

00864     def handle_AQ_FILE_NEW_ERROR(self, marker, error):
        """on AQ_FILE_NEW_ERROR"""
        key = FSKey(self.m.fs, mdid=marker)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.signal_event_with_error("AQ_FILE_NEW_ERROR", error)

00871     def handle_AQ_DIR_NEW_ERROR(self, marker, error):
        """on AQ_DIR_NEW_ERROR"""
        key = FSKey(self.m.fs, mdid=marker)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.signal_event_with_error("AQ_DIR_NEW_ERROR", error)

00878     def handle_AQ_DIR_NEW_OK(self, marker, new_id):
        """on AQ_DIR_NEW_OK"""
        key = FSKey(self.m.fs, mdid=marker)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("AQ_DIR_NEW_OK", {}, new_id)

00885     def handle_FS_FILE_CLOSE_WRITE(self, path):
        """on FS_FILE_CLOSE_WRITE"""
        key = FSKey(self.m.fs, path=path)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event('FS_FILE_CLOSE_WRITE', {})

00892     def handle_HQ_HASH_NEW(self, path, hash, crc32, size, stat):
        """on HQ_HASH_NEW"""
        key = FSKey(self.m.fs, path=path)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.signal_event_with_hash("HQ_HASH_NEW", hash, crc32, size, stat)

00899     def handle_AQ_UPLOAD_FINISHED(self, share_id, node_id, hash):
        """on AQ_UPLOAD_FINISHED"""
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.signal_event_with_hash("AQ_UPLOAD_FINISHED", hash)

00906     def handle_AQ_UPLOAD_ERROR(self, share_id, node_id, error, hash):
        """on AQ_UPLOAD_FINISHED"""
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.signal_event_with_error_and_hash("AQ_UPLOAD_ERROR", error, hash)

00913     def handle_SV_MOVED(self, share_id, node_id, new_share_id, new_parent_id,
                        new_name):
        """on SV_MOVED"""
        key = FSKey(self.m.fs, share_id=share_id, node_id=node_id)
        log = FileLogger(self.logger, key)
        ssmr = SyncStateMachineRunner(self.fsm, self.m, key, log)
        ssmr.on_event("SV_MOVED", {}, share_id, node_id, new_share_id,
                      new_parent_id, new_name)

Generated by  Doxygen 1.6.0   Back to index