Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

tools.py

# ubuntuone.syncdaemon.tools - tools for SyncDaemon
#
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
""" SyncDaemon Tools """
import logging
import time
import sys

from ubuntuone.syncdaemon.dbus_interface import (
    DBUS_IFACE_NAME,
    DBUS_IFACE_STATUS_NAME,
    DBUS_IFACE_SHARES_NAME,
    DBUS_IFACE_SYNC_NAME,
    DBUS_IFACE_FS_NAME,
)
from dbus.lowlevel import SignalMessage, MethodCallMessage, ErrorMessage
from dbus.exceptions import DBusException
from twisted.internet import defer, reactor


00035 class DBusClient(object):
    """ Low level dbus client. To help testing the DBus interface. """

00038     def __init__(self, bus, path, interface, destination=DBUS_IFACE_NAME):
        """ create the instance """
        self.bus = bus
        self.path = path
        self.interface = interface
        self.destination = destination

00045     def send_signal(self, signal, *args):
        """ Send method with *args """
        msg = SignalMessage(self.path, self.interface,
                            signal)
        msg.set_no_reply(True)
        msg.append(*args)
        self.bus.send_message(msg)

00053     def call_method(self, method, *args, **kwargs):
        """ Call method with *args and **kwargs over dbus"""
        msg = MethodCallMessage(self.destination, self.path, self.interface,
                                method)
        msg.set_no_reply(True)
        msg.append(*args)
        reply_handler = kwargs.get('reply_handler', None)
        error_handler = kwargs.get('error_handler', None)
        assert error_handler != None

        def parse_reply(message):
            """ handle the reply message"""
            if isinstance(message, ErrorMessage):
                return error_handler(DBusException(
                                    name=message.get_error_name(),
                                    *message.get_args_list()))
            args_list = message.get_args_list(utf8_strings=False,
                                                  byte_arrays=False)
            if reply_handler:
                if len(args_list) == 0:
                    reply_handler(None)
                elif len(args_list) == 1:
                    return reply_handler(args_list[0])
                else:
                    return reply_handler(tuple(args_list))
        return self.bus.send_message_with_reply(msg,
                                                reply_handler=parse_reply)


00082 class SyncDaemonTool(object):
    """ Various utility methods to test/play with the SyncDaemon. """

    def __init__(self, bus):
        self.bus = bus
        self.last_event = 0
        self.delayed_call = None
        self.log = logging.getLogger('ubuntuone.SyncDaemon.SDTool')

00091     def _get_dict(self, a_dict):
        """ Converts a dict returned by dbus to a dict of strings. """
        str_dict = {}
        for key in a_dict:
            str_dict[key] = unicode(a_dict[key])
        return str_dict

00098     def wait_connected(self):
        """ Wait until syncdaemon is connected to the server. """
        self.log.debug('wait_connected')
        d = defer.Deferred()
        def check_connection_status():
            """ Check if the daemon is up and running. """
            # check if the syncdaemon is running
            # catch all errors, pylint: disable-msg=W0703
            try:
                sync = self.bus.get_object(DBUS_IFACE_NAME, '/')
                self.log.debug('wait_connected: Done!')
                d.callback(True)
            except Exception, e:
                self.log.debug('Not connected: %s', e)
                d.errback()

        reactor.callLater(.5, check_connection_status)
        return d

00117     def get_current_downloads(self):
        """ Return a deferred that 'll be called with the list
        of current downloads
        """
        d = defer.Deferred()
        def current_downloads():
            """ Call Status.current_downloads """
            status_client = DBusClient(self.bus, '/status',
                                       DBUS_IFACE_STATUS_NAME)
            status_client.call_method('current_downloads',
                                      reply_handler=reply_handler,
                                      error_handler=d.errback)

        def reply_handler(downloads):
            """ current downloads callback """
            downloads_str = []
            for download in downloads:
                downloads_str.append(self._get_dict(download))
            d.callback(downloads_str)

        reactor.callLater(0, current_downloads)
        return d

00140     def wait_all_downloads(self, verbose=False):
        """ Wait until there is no more pending downloads """
        self.log.debug('wait_all_downloads')
        d = self.get_current_downloads()
        def reply_handler(downloads):
            """ Check if the are downloads in progress, and reschelude a
            new check if there is at least one.
            """
            if verbose:
                sys.stdout.write(', %s' % str(len(downloads)))
                sys.stdout.flush()
            if len(downloads) > 0:
                self.log.debug('wait_all_downloads: %d', len(downloads))
                return self.get_current_downloads()
            else:
                self.log.debug('wait_all_downloads: No more downloads')
                return True

        if verbose:
            sys.stdout.write('\nchecking current downloads')
            sys.stdout.flush()
        d.addCallback(reply_handler)
        return d

00164     def get_current_uploads(self):
        """ Return a deferred that 'll be called with the list
        of current uploads
        """
        d = defer.Deferred()
        def current_uploads():
            """ Call Status.current_uploads """
            status_client = DBusClient(self.bus, '/status',
                                       DBUS_IFACE_STATUS_NAME)
            status_client.call_method('current_uploads',
                                      reply_handler=reply_handler,
                                      error_handler=d.errback)

        def reply_handler(uploads):
            """ reply handler """
            uploads_str = []
            for upload in uploads:
                uploads_str.append(self._get_dict(upload))
            d.callback(uploads_str)

        reactor.callLater(0, current_uploads)
        return d

00187     def wait_all_uploads(self, verbose=False):
        """ Wait until there is no more pending uploads """
        self.log.debug('wait_all_uploads')
        d = self.get_current_uploads()

        def reply_handler(uploads):
            """ Check if the are downloads in progress, and reschelude a
            new check if there is at least one.
            """
            if verbose:
                sys.stdout.write(', %s' % str(len(uploads)))
                sys.stdout.flush()
            if len(uploads) > 0:
                self.log.debug('wait_all_uploads: %d', len(uploads))
                return self.get_current_uploads()
            else:
                self.log.debug('wait_all_uploads: No more uploads')
                return True

        if verbose:
            sys.stdout.write('\nchecking current uploads')
            sys.stdout.flush()

        d.addCallback(reply_handler)
        return d

00213     def wait_no_more_events(self, last_event_interval, verbose=False):
        """ Wait until no more events are fired by the syncdaemon. """
        self.log.debug('wait_no_more_events')
        d = defer.Deferred()
        def check_last_event():
            """ Check if the daemon is connected and we didn't received event
            in the last_event_interval
            """
            current_time = time.time()
            if self.last_event and \
               current_time - self.last_event < last_event_interval:
                # keep it running in case this is the last event
                self.log.debug('rescheduling wait_no_more_events')
                if not self.delayed_call.active():
                    self.delayed_call = reactor.callLater(last_event_interval,
                                                          check_last_event)
                else:
                    self.delayed_call.reset(last_event_interval)
            else:
                self.log.debug('wait_no_more_events: No more events!')
                d.callback(True)

        if verbose:
            sys.stdout.write("Listening events")
            sys.stdout.flush()
        def event_handler(event_dict):
            """ update last_event and run checks """
            self.last_event = time.time()
            self.log.debug('wait_no_more_events - new event: %s - %s',
                           event_dict['event_name'], str(self.last_event))
            if verbose:
                sys.stdout.write('.')
                sys.stdout.flush()
            if self.delayed_call.active():
                self.delayed_call.reset(last_event_interval)

        self.bus.add_signal_receiver(event_handler, signal_name='Event')
        def cleanup(result):
            """ remove the signal handler """
            self.bus.remove_signal_receiver(event_handler,
                                             signal_name='Event')
            return result
        d.addBoth(cleanup)
        # in case the daemon already reached nirvana
        self.delayed_call = reactor.callLater(last_event_interval,
                                              check_last_event)
        return d

00261     def wait_for_nirvana(self, last_event_interval=5, verbose=False):
        """ Wait until the syncdaemon reachs nirvana. This is when there are:
            - the syncdaemon is connected
            - 0 transfers inprogress
            - no more events are fired in the event queue
        @param last_event_interval: the seconds to wait to determine that there
        is no more events in the queue and the daemon reached nirvana
        """
        self.log.debug('wait_for_nirvana')
        sd_client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
        d = defer.Deferred()
        sd_client.call_method('wait_for_nirvana', last_event_interval,
                              reply_handler=d.callback,
                              error_handler=d.errback)
        return d

00277     def accept_share(self, share_id):
        """ Accept the share with id: share_id. """
        self.log.debug('accept_share(%s)', share_id)
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        shares_client.call_method('accept_share', share_id,
                                  reply_handler=d.callback,
                                  error_handler=d.errback)
        return d

00287     def reject_share(self, share_id):
        """ Reject the share with id: share_id. """
        self.log.debug('reject_share(%s)', share_id)
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        shares_client.call_method('reject_share', share_id,
                                  reply_handler=d.callback,
                                  error_handler=d.errback)
        return d

00297     def get_shares(self):
        """ Get the list of shares (accepted or not) """
        self.log.debug('get_shares')
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        def reply_handler(results):
            """ get_shares reply handler. """
            shares = []
            for result in results:
                shares.append(self._get_dict(result))
            self.log.debug('shares: %r', shares)
            d.callback(shares)

        shares_client.call_method('get_shares',
                                  reply_handler=reply_handler,
                                  error_handler=d.errback)
        return d

00315     def refresh_shares(self):
        """ Call refresh_shares method via DBus.
        Request a refresh of share list to the server.
        """
        self.log.debug('refresh_shares')
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        shares_client.call_method('refresh_shares',
                                  reply_handler=d.callback,
                                  error_handler=d.errback)
        return d

00327     def offer_share(self, path, username, name, access_level):
        """ Offer a share at the specified path to user with id: username. """
        self.log.debug('offer_share(%s, %s, %s, %s)',
                   path, username, name, access_level)
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        shares_client.call_method('create_share', path, username,
                                  name, access_level,
                                  reply_handler=d.callback,
                                  error_handler=d.errback)
        return d

00339     def list_shared(self):
        """ get the list of the shares "shared"/created/offered. """
        self.log.debug('list_shared')
        shares_client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        def reply_handler(results):
            """ get_shares reply handler. """
            shares = []
            for result in results:
                shares.append(self._get_dict(result))
            self.log.debug('shared: %r', shares)
            d.callback(shares)
        shares_client.call_method('get_shared',
                                  reply_handler=reply_handler,
                                  error_handler=d.errback)
        return d

00356     def query_by_path(self, path):
        """requesting a query of the node idetifiend by 'path' to the server."""
        self.log.debug('query_by_path(%s)', path)
        sd_client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
        d = defer.Deferred()
        sd_client.call_method('query_by_path', path,
                              reply_handler=d.callback,
                              error_handler=d.errback)
        return d

00366     def get_metadata(self, path):
        """ calls the exposed mtehod FileSystem.get_metadata using DBus. """
        self.log.debug('get_metadata(%s)', path)
        fs_client = DBusClient(self.bus, '/filesystem', DBUS_IFACE_FS_NAME)
        d = defer.Deferred()
        fs_client.call_method('get_metadata', path,
                              reply_handler=d.callback,
                              error_handler=d.errback)
        return d

Generated by  Doxygen 1.6.0   Back to index