Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

testcase.py

#
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
""" Base tests cases and test utilities """
import dbus
import dbus.service
import dbus.mainloop.glib
import logging
import os
import shutil
from warnings import warn


from ubuntuone.syncdaemon import (
    action_queue,
    event_queue,
    filesystem_manager as fs_manager,
    interfaces,
    volume_manager,
    main,
)
from ubuntuone.syncdaemon.dbus_interface import (
    DBusInterface,
    DBusExposedObject,
    NM_STATE_CONNECTED,
    NM_STATE_DISCONNECTED,
)
from twisted.internet import defer, reactor
from twisted.trial.unittest import TestCase as TwistedTestCase
from zope.interface import implements

00045 class FakeOAuthClient(object):
    """ Fake OAuthClient"""

00048     def __init__(self, realm):
        """ create the instance. """
        self.realm = realm
        self.consumer = 'ubuntuone'

00053     def get_access_token(self):
        """ returns a Token"""
        return 'a token'


00058 class FakeHashQueue(object):
    """A fake hash queue"""
00060     def empty(self):
        """are we empty? sure we are"""
        return True

00064     def __len__(self):
        """ length is 0. we are empty, right?"""
        return 0


00069 class FakeMain(main.Main):
    """ A fake Main class to setup the tests """

    # don't call Main.__init__ we take care of creating a fake main and
    # all it's attributes. pylint: disable-msg=W0231
00074     def __init__(self, root_dir, shares_dir, data_dir):
        """ create the instance. """
        self.logger = logging.getLogger('ubuntuone.SyncDaemon.FakeMain')
        self.root_dir = root_dir
        self.data_dir = data_dir
        self.shares_dir = shares_dir
        self.realm = 'http://test.ubuntuone.com'
        self.oauth_client = FakeOAuthClient(self.realm)
        self.vm = volume_manager.VolumeManager(self)
        self.fs = fs_manager.FileSystemManager(self.data_dir, self.vm)
        self.event_q = event_queue.EventQueue(self.fs)
        self.action_q = FakeActionQueue(self.event_q)
        self.state = main.SyncDaemonStateManager(self)
        self.event_q.subscribe(self.vm)
        self.vm.init_root()
        self.hash_q = FakeHashQueue()

00091     def _connect_aq(self, token):
        """ Connect the fake action queue """
        self.action_q.connect()

00095     def _disconnect_aq(self):
        """ Disconnect the fake action queue """
        self.action_q.disconnect()

00099     def shutdown(self):
        """ shutdown all created objects """
        self.event_q.shutdown()

00103     def get_access_token(self):
        """fake get token"""
        return None

00107     def check_version(self):
        """ Check the client protocol version matches that of the server. """
        self.event_q.push('SYS_PROTOCOL_VERSION_OK')

00111     def authenticate(self):
        """ Do the OAuth dance. """
        self.event_q.push('SYS_OAUTH_OK')

00115     def get_root(self, root_mdid):
        """ Ask que AQ for our root's uuid. """
        return defer.succeed('root_uuid')

00119     def server_rescan(self):
        """ Do the server rescan? naaa! """
        self.event_q.push('SYS_SERVER_RESCAN_STARTING')
        self.event_q.push('SYS_SERVER_RESCAN_DONE')
        return defer.succeed('root_uuid')



00127 class BaseTwistedTestCase(TwistedTestCase):
    """ Base TestCase that provides:
        mktemp(name): helper to create temporary dirs
    """

00132     def mktemp(self, name='temp'):
        """ Customized mktemp that accepts an optional name argument. """
        tempdir = os.path.join(self.tmpdir, name)
        if os.path.exists(tempdir):
            self.rmtree(tempdir)
        self.makedirs(tempdir)
        return tempdir

    @property
00141     def tmpdir(self):
        """ default tmpdir: 'module name'/'class name'"""
        MAX_FILENAME = 32 # some platforms limit lengths of filenames
        base = os.path.join(self.__class__.__module__[:MAX_FILENAME],
                            self.__class__.__name__[:MAX_FILENAME],
                            self._testMethodName[:MAX_FILENAME])
        # use _trial_temp dir, it should be os.gwtcwd()
        # define the root temp dir of the testcase, pylint: disable-msg=W0201
        self.__root = os.path.join(os.getcwd(), base)
        return os.path.join(self.__root, 'tmpdir')

00152     def rmtree(self, path):
        """ rmtree that handle ro parent(s) and childs. """
        # change perms to rw, so we can delete the temp dir
        if path != getattr(self, '__root', None):
            os.chmod(os.path.dirname(path), 0755)
        os.chmod(path, 0755)
        for dirpath, dirs, files in os.walk(path):
            for dir in dirs:
                os.chmod(os.path.join(dirpath, dir), 0777)
        shutil.rmtree(path)

00163     def makedirs(self, path):
        """ makedirs that handle ro parent. """
        parent = os.path.dirname(path)
        if os.path.exists(parent):
            os.chmod(parent, 0755)
        os.makedirs(path)

00170     def tearDown(self):
        """ cleanup the temp dir. """
        if hasattr(self, '__root'):
            self.rmtree(self.__root)
        return TwistedTestCase.tearDown(self)


00177 class DBusTwistedTestCase(BaseTwistedTestCase):
    """ Test the DBus event handling """

00180     def setUp(self):
        """ Setup the infrastructure fo the test (dbus service). """
        BaseTwistedTestCase.setUp(self)
        self.log = logging.getLogger("ubuntuone.SyncDaemon.TEST")
        self.log.info("starting test %s.%s", self.__class__.__name__,
                      self._testMethodName)
        self.timeout = 2
        self.data_dir = self.mktemp('data_dir')
        self.root_dir = self.mktemp('root_dir')
        self.shares_dir = self.mktemp('shares_dir')
        self.main = FakeMain(self.root_dir, self.shares_dir, self.data_dir)
        self.fs_manager = self.main.fs
        self.event_q = self.main.event_q
        self.action_q = self.main.action_q
        self.loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.bus = dbus.bus.BusConnection(mainloop=self.loop)
        self.nm = FakeNetworkManager(self.bus)
        # monkeypatch busName.__del__ to avoid errors on gc
        # we take care of releasing the name in shutdown
        dbus.service.BusName.__del__ = lambda _: None
        self.dbus_iface = DBusInterface(self.bus, self.main,
                                        system_bus=self.bus)
        self.busName = self.dbus_iface.busName
        self.bus.set_exit_on_disconnect(False)
        self.dbus_iface.connect()
        self.event_q.push('SYS_WAIT_FOR_LOCAL_RESCAN')
        self.event_q.push('SYS_LOCAL_RESCAN_DONE')
        self.signal_receivers = set()

00209     def tearDown(self):
        """ Cleanup the test. """
        d = self.cleanup_signal_receivers()
        d.addBoth(self._tearDown)
        return d

00215     def _tearDown(self, *args):
        """ shutdown """
        self.main.shutdown()
        self.dbus_iface.shutdown()
        self.nm.shutdown()
        self.dbus_iface.bus.flush()
        self.bus.flush()
        self.bus.close()
        self.rmtree(self.shares_dir)
        self.rmtree(self.root_dir)
        self.rmtree(self.data_dir)
        self.log.info("finished test %s.%s", self.__class__.__name__,
                      self._testMethodName)
        return BaseTwistedTestCase.tearDown(self)

00230     def error_handler(self, *error):
        """ default error handler for DBus calls. """
        self.fail(error)

00234     def cleanup_signal_receivers(self):
        """ cleanup self.signal_receivers and returns a deferred """
        deferreds = []
        for match in self.signal_receivers:
            rule = str(match)
            d = defer.Deferred()
            def callback(*args):
                """ callback that accepts *args. """
                d.callback(args)
            self.bus.call_async(dbus.bus.BUS_DAEMON_NAME,
                                dbus.bus.BUS_DAEMON_PATH,
                                dbus.bus.BUS_DAEMON_IFACE, 'RemoveMatch', 's',
                                (str(match),), callback, self.error_handler)
            deferreds.append(d)
        if deferreds:
            return defer.DeferredList(deferreds)
        else:
            return defer.succeed(True)



00255 class FakeActionQueue(object):
    """ stub implementation """

    implements(interfaces.IActionQueue)

00260     def __init__(self, eq, *args, **kwargs):
        """ Creates the instance """
        self.eq = eq
        self.client = action_queue.ActionQueueProtocol()
        self.uploading = {}
        self.downloading = {}
        class UUID_Map(object):
            """mock uuid map"""
            def set(self, *args):
                """mock set method"""
                pass

        self.uuid_map = UUID_Map()
        self.content_queue = action_queue.ContentQueue('CONTENT', self)
        self.meta_queue = action_queue.MetaQueue('META', self)

00276     def connect(self, host=None, port=None, user_ssl=False):
        """ stub implementation """
        self.eq.push('SYS_CONNECTION_MADE')

00280     def disconnect(self):
        """ stub implementation """
        pass

00284     def cancel_download(self, share_id, node_id):
        """ stub implementation """
        pass

00288     def cancel_upload(self, share_id, node_id):
        """ stub implementation """
        pass

00292     def download(self, share_id, node_id, server_hash, fileobj):
        """ stub implementation """
        pass

00296     def upload(self, share_id, node_id, previous_hash, hash, crc32,
               size, deflated_size, fileobj):
        """ stub implementation """
        pass

00301     def make_file(self, share_id, parent_id, name, marker):
        """ stub implementation """
        pass

00305     def make_dir(self, share_id, parent_id, name, marker):
        """ stub implementation """
        pass

00309     def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
        """ stub implementation """
        pass

00313     def unlink(self, share_id, node_id):
        """ stub implementation """
        pass

00317     def query(self, items):
        """ stub implementation """
        pass

00321     def listdir(self, share_id, node_id, server_hash, fileobj):
        """ stub implementation """
        pass

00325     def list_shares(self):
        """ stub implementation """
        pass

00329     def answer_share(self, share_id, answer):
        """ stub implementation """
        d = defer.Deferred()
        reactor.callLater(0.2, d.callback, True)
        return d

00335     def create_share(self, *args):
        """ sutb implementation """
        pass


00340 class MementoHandler(logging.Handler):
    """ A handler class which store logging records in a list """

00343     def __init__(self, *args, **kwargs):
        """ Create the instance, and add a records attribute. """
        logging.Handler.__init__(self, *args, **kwargs)
        self.records = []

00348     def emit(self, record):
        """ Just add the record to self.records. """
        self.records.append(record)


00353 class FakeNetworkManager(DBusExposedObject):
    """ A fake NetworkManager that only emits StatusChanged signal. """

    State = 3

00358     def __init__(self, bus):
        """ Creates the instance. """
        self.path = '/org/freedesktop/NetworkManager'
        self.bus = bus
        self.bus.request_name('org.freedesktop.NetworkManager',
                              flags=dbus.bus.NAME_FLAG_REPLACE_EXISTING | \
                              dbus.bus.NAME_FLAG_DO_NOT_QUEUE | \
                              dbus.bus.NAME_FLAG_ALLOW_REPLACEMENT)
        self.busName = dbus.service.BusName('org.freedesktop.NetworkManager',
                                        bus=self.bus)
        DBusExposedObject.__init__(self, bus_name=self.busName,
                                   path=self.path)

00371     def shutdown(self):
        """ Shutdown the fake NetworkManager """
        self.busName.get_bus().release_name(self.busName.get_name())
        self.remove_from_connection()

    @dbus.service.signal('org.freedesktop.NetworkManager', signature='i')
00377     def StateChanged(self, state):
        """ Fire DBus signal StatusChanged. """
        pass

00381     def emit_connected(self):
        """ Emits the signal StateCganged(3). """
        self.StateChanged(NM_STATE_CONNECTED)

00385     def emit_disconnected(self):
        """ Emits the signal StateCganged(4). """
        self.StateChanged(NM_STATE_DISCONNECTED)

    @dbus.service.method(dbus.PROPERTIES_IFACE,
                         in_signature='ss', out_signature='v',
                         async_callbacks=('reply_handler', 'error_handler'))
00392     def Get(self, interface, propname, reply_handler=None, error_handler=None):
        """
        Fake dbus's Get method to get at the State property
        """
        try:
            reply_handler(getattr(self, propname, None))
        except Exception, e: # pylint: disable-msg=W0703
            error_handler(e)

00401 class FakeVolumeManager(object):
    """ A volume manager that only knows one share, the root"""

00404     def __init__(self, root_path):
        """ Creates the instance"""
00406         self.root = volume_manager.Share(root_path, access_level='Modify')
        self.shares = {'':self.root}
        self.log = logging.getLogger('ubuntuone.SyncDaemon.VM-test')

00410     def add_share(self, share):
        """ Adss share to the shares dict """
        self.shares[share.id] = share
        # if the share don't exists, create it
        if not os.path.exists(share.path):
            os.mkdir(share.path)
        # if it's a ro share, change the perms
        if not share.can_write():
            os.chmod(share.path, 0555)

00420     def on_server_root(self, _):
        """
        Do nothing
        """


Generated by  Doxygen 1.6.0   Back to index