Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

event_queue.py

# ubuntuone.syncdaemon.event_queue - Event queuing
#
# Author: Facundo Batista <facundo@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
'''Module that implements the Event Queue machinery.'''

import functools
import logging
import os
import re

import pyinotify
from twisted.internet import abstract, reactor, error, defer

from Queue import Queue, Empty

00030 class InvalidEventError(Exception):
    '''Received an Event that is not in the allowed list.'''

# these are our internal events, what is inserted into the whole system
EVENTS = {
    'FS_FILE_CLOSE_WRITE': ('path',),
    'FS_FILE_CREATE': ('path',),
    'FS_DIR_CREATE': ('path',),
    'FS_FILE_DELETE': ('path',),
    'FS_DIR_DELETE': ('path',),
    'FS_FILE_MOVE': ('path_from', 'path_to',),
    'FS_DIR_MOVE': ('path_from', 'path_to',),

    'AQ_FILE_NEW_OK': ('marker', 'new_id'),
    'AQ_FILE_NEW_ERROR': ('marker', 'error'),
    'AQ_DIR_NEW_OK': ('marker', 'new_id'),
    'AQ_DIR_NEW_ERROR': ('marker', 'error'),
    'AQ_MOVE_OK': ('share_id', 'node_id'),
    'AQ_MOVE_ERROR': ('share_id', 'node_id',
                      'old_parent_id', 'new_parent_id', 'new_name', 'error'),
    'AQ_UNLINK_OK': ('share_id', 'parent_id', 'node_id'),
    'AQ_UNLINK_ERROR': ('share_id', 'parent_id', 'node_id', 'error'),
    'AQ_DOWNLOAD_STARTED': ('share_id', 'node_id', 'server_hash'),
    'AQ_DOWNLOAD_FINISHED': ('share_id', 'node_id', 'server_hash'),
    'AQ_DOWNLOAD_ERROR': ('share_id', 'node_id', 'server_hash', 'error'),
    'AQ_UPLOAD_STARTED' : ('share_id', 'node_id', 'hash'),
    'AQ_UPLOAD_FINISHED': ('share_id', 'node_id', 'hash'),
    'AQ_UPLOAD_ERROR': ('share_id', 'node_id', 'error', 'hash'),
    'AQ_SHARES_LIST': ('shares_list',),
    'AQ_LIST_SHARES_ERROR': ('error',),
    'AQ_CREATE_SHARE_OK': ('share_id', 'marker'),
    'AQ_CREATE_SHARE_ERROR': ('marker', 'error'),
    'AQ_QUERY_ERROR': ('item', 'error'),

    'SV_SHARE_CHANGED': ('message', 'info'),
    'SV_SHARE_ANSWERED': ('share_id', 'answer'),
    'SV_HASH_NEW': ('share_id', 'node_id', 'hash'),
    'SV_FILE_NEW': ('share_id', 'node_id', 'parent_id', 'name'),
    'SV_DIR_NEW': ('share_id', 'node_id', 'parent_id', 'name'),
    'SV_FILE_DELETED': ('share_id', 'node_id'),
    'SV_MOVED': ('share_id', 'node_id', 'new_share_id', 'new_parent_id',
                 'new_name'),
    'HQ_HASH_NEW': ('path', 'hash', 'crc32', 'size', 'stat'),
    'SYS_CONNECT': ('access_token',),
    'SYS_DISCONNECT': (),
    'SYS_STATE_CHANGED': ('state',),
    'SYS_NET_CONNECTED': (),
    'SYS_NET_DISCONNECTED': (),
    'SYS_WAIT_FOR_LOCAL_RESCAN': (),
    'SYS_LOCAL_RESCAN_DONE': (),
    'SYS_CONNECTION_MADE': (),
    'SYS_CONNECTION_LOST': (),
    'SYS_PROTOCOL_VERSION_ERROR': ('error',),
    'SYS_PROTOCOL_VERSION_OK': (),
    'SYS_OAUTH_ERROR': ('error',),
    'SYS_OAUTH_OK': (),
    'SYS_SERVER_RESCAN_STARTING': (),
    'SYS_SERVER_RESCAN_DONE': (),
    'SYS_META_QUEUE_WAITING': (),
    'SYS_META_QUEUE_DONE': (),
    'SYS_CONTENT_QUEUE_WAITING': (),
    'SYS_CONTENT_QUEUE_DONE': (),
    'SYS_CLEANUP_STARTED': (),
    'SYS_CLEANUP_FINISHED': (),
    'SYS_UNKNOWN_ERROR': (),
}

if 'IN_CREATE' in vars(pyinotify.EventsCodes):
    # < 0.8; event codes in EventsCodes; events have is_dir attribute
    evtcodes = pyinotify.EventsCodes
    event_is_dir = lambda event: event.is_dir
else:
    # >= 0.8; event codes in pyinotify itself; events have dir attribute
    evtcodes = pyinotify
    event_is_dir = lambda event: event.dir

# translates quickly the event and it's is_dir state to our standard events
NAME_TRANSLATIONS = {
    evtcodes.IN_CLOSE_WRITE: 'FS_FILE_CLOSE_WRITE',
    evtcodes.IN_CREATE: 'FS_FILE_CREATE',
    evtcodes.IN_CREATE | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
    evtcodes.IN_DELETE: 'FS_FILE_DELETE',
    evtcodes.IN_DELETE | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
    evtcodes.IN_MOVED_FROM: 'FS_FILE_DELETE',
    evtcodes.IN_MOVED_FROM | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
    evtcodes.IN_MOVED_TO: 'FS_FILE_CREATE',
    evtcodes.IN_MOVED_TO | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
}

# these are the events that will listen from inotify
INOTIFY_EVENTS = (
    evtcodes.IN_CLOSE_WRITE |
    evtcodes.IN_CREATE |
    evtcodes.IN_DELETE |
    evtcodes.IN_MOVED_FROM |
    evtcodes.IN_MOVED_TO |
    evtcodes.IN_MOVE_SELF
)

DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)

00131 class _INotifyProcessor(pyinotify.ProcessEvent):
    '''Helper class that is called from inpotify when an event happens.

    This class also catchs the MOVEs events, and synthetises a new
    FS_(DIR|FILE)_MOVE event when possible.
    '''
    def __init__(self, eq):
        self.log = logging.getLogger('ubuntuone.SyncDaemon.INotifyProcessor')
        self.eq = eq
        self.held_event = None
        self.timer = None
        self.frozen_path = None
        self.frozen_evts = False

00145     def on_timeout(self):
        '''Called on timeout.'''
        if self.held_event is not None:
            self.release_held_event(True)

00150     def release_held_event(self, timed_out=False):
        '''Release the event on hold to fulfill its destiny.'''
        if not timed_out:
            try:
                self.timer.cancel()
            except error.AlreadyCalled:
                # self.timeout() was *just* called, do noting here
                return
        self.push_event(self.held_event)
        self.held_event = None

00161     def process_IN_MOVE_SELF(self, event):
        '''Don't do anything here.

        We just turned this event on because pyinotify does some
        path-fixing in its internal processing when this happens
        '''

00168     def process_IN_MOVED_FROM(self, event):
        '''Capture the MOVED_FROM to maybe syntethize FILE_MOVED.'''
        if self.held_event is not None:
            self.release_held_event()

        self.held_event = event
        self.timer = reactor.callLater(1, self.on_timeout)

00176     def is_ignored(self, path):
        """should we ignore this path?"""
        for part in path.split(os.path.sep):
            if path.endswith(".partial") or \
                    re.search(r"\.conflict(?:\.\d+)?$", path):
                return True

        # don't support symlinks yet
        if os.path.islink(path):
            return True
        return False

00188     def process_IN_MOVED_TO(self, event):
        '''Capture the MOVED_TO to maybe syntethize FILE_MOVED.'''
        if self.held_event is not None:
            if event.cookie == self.held_event.cookie:
                try:
                    self.timer.cancel()
                except error.AlreadyCalled:
                    # self.timeout() was *just* called, do noting here
                    pass
                else:
                    f_path = os.path.join(self.held_event.path,
                                                        self.held_event.name)
                    t_path = os.path.join(event.path, event.name)

                    if not self.is_ignored(f_path) and \
                            not self.is_ignored(t_path):
                        f_share_id = self.eq.fs.get_by_path(\
                                             os.path.dirname(f_path)).share_id
                        t_share_id = self.eq.fs.get_by_path(\
                                             os.path.dirname(t_path)).share_id
                        evtname = "FS_DIR_" if event_is_dir(event) else "FS_FILE_"
                        if f_share_id != t_share_id:
                            # if the share_id are != push a delete/create
                            self.eq.push(evtname+"DELETE", f_path)
                            self.eq.push(evtname+"CREATE", t_path)
                        else:
                            self.eq.push(evtname+"MOVE", f_path, t_path)
                    self.held_event = None
                return
            else:
                self.release_held_event()
                self.push_event(event)
        else:
            # we don't have a held_event so this is a move from outside.
            # if it's a file move it's atomic on POSIX, so we aren't going to
            # receive a IN_CLOSE_WRITE, so let's fake it for files
            self.push_event(event)
            t_path = os.path.join(event.path, event.name)
            if not os.path.isdir(t_path):
                self.eq.push('FS_FILE_CLOSE_WRITE', t_path)

00229     def process_default(self, event):
        '''Push the event into the EventQueue.'''
        if self.held_event is not None:
            self.release_held_event()
        self.push_event(event)

00235     def push_event(self, event):
        '''Push the event to the EQ.'''
        # ignore this trash
        if event.mask == evtcodes.IN_IGNORED:
            return

        # change the pattern IN_CREATE to FS_FILE_CREATE or FS_DIR_CREATE
        try:
            evt_name = NAME_TRANSLATIONS[event.mask]
        except:
            raise KeyError("Unhandled Event in INotify: %s" % event)

        # push the event
        fullpath = os.path.join(event.path, event.name)

        # check if the path is not frozen
        if self.frozen_path is not None:
            if os.path.dirname(fullpath) == self.frozen_path:
                # this will at least store the last one, for debug
                # purposses
                self.frozen_evts = (evt_name, fullpath)
                return

        if not self.is_ignored(fullpath):
            if evt_name == 'FS_DIR_DELETE':
                self.handle_dir_delete(fullpath)
            self.eq.push(evt_name, fullpath)

00263     def freeze_begin(self, path):
        """Puts in hold all the events for this path."""
        self.log.debug("Freeze begin: %r", path)
        self.frozen_path = path
        self.frozen_evts = False

00269     def freeze_rollback(self):
        """Unfreezes the frozen path, reseting to idle state."""
        self.log.debug("Freeze rollback: %r", self.frozen_path)
        self.frozen_path = None
        self.frozen_evts = False

00275     def freeze_commit(self, events):
        """Unfreezes the frozen path, sending received events if not dirty.

        If events for that path happened:
            - return True
        else:
            - push the here received events, return False
        """
        self.log.debug("Freeze commit: %r (%d events)",
                                                self.frozen_path, len(events))
        if self.frozen_evts:
            # ouch! we're dirty!
            self.log.debug("Dirty by %s", self.frozen_evts)
            self.frozen_evts = False
            return True

        # push the received events
        for evt_name, path in events:
            if not self.is_ignored(path):
                self.eq.push(evt_name, path)

        self.frozen_path = None
        self.frozen_evts = False
        return False

00300     def handle_dir_delete(self, fullpath):
        """ handle the case of move a dir to a non-watched directory """
        paths = self.eq.fs.get_paths_starting_with(fullpath)
        paths.sort(reverse=True)
        for path, is_dir in paths:
            if path == fullpath:
                continue
            if is_dir:
                self.eq.push('FS_DIR_DELETE', path)
            else:
                self.eq.push('FS_FILE_DELETE', path)


00313 class EventQueue(object):
    '''Manages the events from different sources and distributes them.'''

    def __init__(self, fs):
        self._listeners = []

        self.log = logging.getLogger('ubuntuone.SyncDaemon.EQ')
        self.fs = fs
        # hook inotify
        self._inotify_reader = None
        self._inotify_wm = wm = pyinotify.WatchManager()
        self._processor = _INotifyProcessor(self)
        self._inotify_notifier = pyinotify.Notifier(wm, self._processor)
        self._hook_inotify_to_twisted(wm, self._inotify_notifier)
        self._watchs = {}
        self.dispatching = False
        self.dispatch_queue = Queue()
        self.empty_event_queue_callbacks = set()

00332     def add_empty_event_queue_callback(self, callback):
        """add a callback for when the even queue has no more events."""
        self.empty_event_queue_callbacks.add(callback)
        if not self.dispatching and self.dispatch_queue.empty():
            if callable(callback):
                callback()

00339     def remove_empty_event_queue_callback(self, callback):
        """remove the callback"""
        self.empty_event_queue_callbacks.remove(callback)

00343     def _hook_inotify_to_twisted(self, wm, notifier):
        '''This will hook inotify to twisted.'''

        class MyReader(abstract.FileDescriptor):
            '''Chain between inotify and twisted.'''
            # will never pass a fd to write, pylint: disable-msg=W0223

            def fileno(self):
                '''Returns the fileno to select().'''
                return wm._fd

            def doRead(self):
                '''Called when twisted says there's something to read.'''
                notifier.read_events()
                notifier.process_events()

        reader = MyReader()
        self._inotify_reader = reader
        reactor.addReader(reader)

00363     def shutdown(self):
        '''Prepares the EQ to be closed.'''
        self._inotify_notifier.stop()
        reactor.removeReader(self._inotify_reader)

00368     def inotify_rm_watch(self, dirpath):
        '''Remove watch from a dir.'''
        try:
            wd = self._watchs[dirpath]
        except KeyError:
            raise ValueError("The path %r is not watched right now!" % dirpath)
        result = self._inotify_wm.rm_watch(wd)
        if not result[wd]:
            raise RuntimeError("The path %r couldn't be removed!" % dirpath)
        del self._watchs[dirpath]

00379     def inotify_add_watch(self, dirpath):
        '''Add watch to a dir.'''
        self.log.debug("Adding inotify watch to %r", dirpath)
        result = self._inotify_wm.add_watch(dirpath, INOTIFY_EVENTS)
        self._watchs[dirpath] = result[dirpath]

00385     def unsubscribe(self, obj):
        '''Removes the callback object from the listener queue.

        @param obj: the callback object to remove from the queue.
        '''
        self._listeners.remove(obj)

00392     def subscribe(self, obj):
        '''Stores the callback object to whom push the events when received.

        @param obj: the callback object to add to the listener queue.

        These objects should provide a 'handle_FOO' to receive the FOO
        events (replace FOO with the desired event).
        '''
        if obj not in self._listeners:
            self._listeners.append(obj)

00403     def push(self, event_name, *args, **kwargs):
        '''Receives a push for all events.

        The signature for each event is forced on each method, not in this
        'push' arguments.
        '''
        self.log.debug("push_event: %s, args:%s, kw:%s",
                     event_name, args, kwargs)
        # get the event parameters
        try:
            event_params = EVENTS[event_name]
        except KeyError:
            msg = "The received event_name (%r) is not valid!" % event_name
            self.log.error(msg)
            raise InvalidEventError(msg)

        # validate that the received arguments are ok
        if args:
            if len(args) > len(event_params):
                msg = "Too many arguments! (should receive %s)" % event_params
                self.log.error(msg)
                raise TypeError(msg)
            event_params = event_params[len(args):]

        s_eventparms = set(event_params)
        s_kwargs = set(kwargs.keys())
        if s_eventparms != s_kwargs:
            msg = "Wrong arguments for event %s (should receive %s, got %s)" \
                  % (event_name, event_params, kwargs.keys())
            self.log.error(msg)
            raise TypeError(msg)

        # check if we are currently dispatching an event
        self.dispatch_queue.put((event_name, args, kwargs))
        if not self.dispatching:
            self.dispatching = True
            while True:
                try:
                    event_name, args, kwargs = \
                            self.dispatch_queue.get(block=False)
                    self._dispatch(event_name, *args, **kwargs)
                except Empty:
                    self.dispatching = False
                    for callable in self.empty_event_queue_callbacks.copy():
                        callable()
                    break

00450     def _dispatch(self, event_name, *args, **kwargs):
        """ push the event to all listeners. """
        # check listeners to see if have the proper method, and call it
        meth_name = "handle_" + event_name
        for listener in self._listeners:
            # don't use hasattr because is expensive and
            # catch too many errors
            # we need to catch all here, pylint: disable-msg=W0703
            method = self._get_listener_method(listener, meth_name, event_name)
            if method is not None:
                try:
                    method(*args, **kwargs)
                except Exception:
                    self.log.exception("Error encountered while handling: %s"
                                     " in %s", event_name, listener)

00466     def _get_listener_method(self, listener, method_name, event_name):
        """ returns the method named method_name or hanlde_default from the
        listener. Or None if the methods are not defined in the listener.
        """
        method = getattr(listener, method_name, None)
        if method is None:
            method = getattr(listener, DEFAULT_HANDLER, None)
            if method is not None:
                method = functools.partial(method, event_name)
        return method

00477     def freeze_begin(self, path):
        """Puts in hold all the events for this path."""
        if self._processor.frozen_path is not None:
            raise ValueError("There's something already frozen!")
        self._processor.freeze_begin(path)

00483     def freeze_rollback(self):
        """Unfreezes the frozen path, reseting to idle state."""
        if self._processor.frozen_path is None:
            raise ValueError("Rolling back with nothing frozen!")
        self._processor.freeze_rollback()

00489     def freeze_commit(self, events):
        """Unfreezes the frozen path, sending received events if not dirty.

        If events for that path happened:
            - return True
        else:
            - push the here received events, return False
        """
        if self._processor.frozen_path is None:
            raise ValueError("Commiting with nothing frozen!")

        d = defer.execute(self._processor.freeze_commit, events)
        return d

Generated by  Doxygen 1.6.0   Back to index