Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

test_dbus.py

# -*- coding: utf-8 -*-
#
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
""" Tests for the DBUS interface """
import os
import unittest

from ubuntuone.storageprotocol.sharersp import (
    NotifyShareHolder,
)
from ubuntuone.syncdaemon.dbus_interface import (
    DBUS_IFACE_STATUS_NAME,
    DBUS_IFACE_EVENTS_NAME,
    DBUS_IFACE_FS_NAME,
    DBUS_IFACE_SYNC_NAME,
    DBUS_IFACE_SHARES_NAME,
)
from ubuntuone.syncdaemon.volume_manager import Share
from ubuntuone.syncdaemon.tools import DBusClient
from ubuntuone.syncdaemon import event_queue
from ubuntuone.syncdaemon import states
from contrib.testing.testcase import (
    DBusTwistedTestCase,
)
from twisted.internet import defer
from ubuntuone.syncdaemon.marker import MDMarker


00043 class DBusInterfaceTests(DBusTwistedTestCase):
    """ Basic tests to the objects exposed with D-Bus"""

00046     def test_status(self):
        """ Test the Status Object, registering it to the session bus, and
        calling current_uploads method.
        """
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)
        d = defer.Deferred()
        def status_handler(result):
            """ reply handler """
            state = states.IDLE
            self.assertEquals(state.name, result['name'])
            self.assertEquals(state.description, result['description'])
            self.assertEquals(state.is_error, bool(result['is_error']))
            self.assertEquals(state.is_connected, bool(result['is_connected']))
            self.assertEquals(state.is_online, bool(result['is_online']))
            d.callback(True)

        def downloads_handler(result):
            """ reply handler """
            self.assertEquals(0, len(result))
            client.call_method('current_status',
                               reply_handler=status_handler,
                               error_handler=self.error_handler)

        def uploads_handler(result):
            """ reply handler """
            self.assertEquals(0, len(result))
            client.call_method('current_downloads',
                               reply_handler=downloads_handler,
                               error_handler=self.error_handler)

        client.call_method('current_uploads', reply_handler=uploads_handler,
                           error_handler=self.error_handler)
        return d

00080     def test_syncDaemon(self):
        """ Test the SyncDaemon object, registering it to the session bus, and
        calling connect and disconnect methods.
        """
        client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
        d = defer.Deferred()
        def disconnect_handler(result):
            """ reply handler """
            self.assertEquals(None, result)
            d.callback(True)

        def connect_handler(result):
            """ reply handler """
            self.assertEquals(None, result)
            client.call_method("disconnect", reply_handler=disconnect_handler,
                               error_handler=self.error_handler)

        client.call_method("connect", reply_handler=connect_handler,
                           error_handler=self.error_handler)

        return d

00102     def test_filesystem(self):
        """ Test the FileSystem Object, registering it to the session bus, and
        excercising the API.
        """
        share_path = os.path.join(self.shares_dir, 'share')
        self.main.vm.add_share(Share(share_path, share_id='share'))
        path = os.path.join(share_path, "foo")
        self.fs_manager.create(path, "share")
        self.fs_manager.set_node_id(path, "node_id")
        d = defer.Deferred()
        def handler(metadata):
            """ reply handler """
            d.callback(metadata)

        def callback(result):
            """ callback to check the result. """
            self.assertEquals('foo', str(result['path']))
            self.assertEquals('share', result['share_id'])
            self.assertEquals('node_id', result['node_id'])

        d.addCallback(callback)
        client = DBusClient(self.bus, '/filesystem', DBUS_IFACE_FS_NAME)
        client.call_method('get_metadata', path, reply_handler=handler,
                           error_handler=self.error_handler)
        return d

00128     def test_push_event(self):
        """ Test the exposed method: push_event """
        d = defer.Deferred()
        class Listener(object):
            """ A basic listener to handle the pushed event. """

            def handle_FS_FILE_CREATE(self, path):
                """ FS_FILE_CREATE handling """
                self.assertEquals('bar', path)

        listener = Listener()
        self.event_q.subscribe(listener)
        self.event_q.unsubscribe(self.dbus_iface.event_listener)

        def empty_queue_callback():
            """
            Called when the event queue empties
            """
            d.callback(True)

        def default(path):
            """ default callback to assert the pushed event. """
            self.event_q.unsubscribe(listener)
            self.event_q.subscribe(self.dbus_iface.event_listener)
            self.event_q.remove_empty_event_queue_callback(empty_queue_callback)

        d.addCallback(default)

        client = DBusClient(self.bus, '/events',
                   DBUS_IFACE_EVENTS_NAME)
        client.call_method('push_event', 'FS_FILE_CREATE', ('bar',),
                          error_handler=self.error_handler)
        self.event_q.add_empty_event_queue_callback(empty_queue_callback)
        return d


00164     def test_current_downloads(self):
        """ Test Status.current_downloads with fake data in the AQ """
        share_path = os.path.join(self.shares_dir, 'share')
        self.main.vm.add_share(Share(share_path, share_id='share_id'))
        down_path = os.path.join(share_path, "down_path")
        self.fs_manager.create(down_path, "share_id")
        self.fs_manager.set_node_id(down_path, "node_id")
        self.action_q.downloading[('share_id', 'node_id')] = dict(
            deflated_size=10, size=100, n_bytes_read=1)
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)
        d = defer.Deferred()
        def downloads_handler(result):
            """ reply handler """
            self.assertEquals(1, len(result))
            self.assertEquals(down_path, str(result[0]['path']))
            self.assertEquals('10', str(result[0]['deflated_size']))
            self.assertEquals('1', str(result[0]['n_bytes_read']))
            d.callback(True)

        client.call_method('current_downloads',
                           reply_handler=downloads_handler,
                           error_handler=self.error_handler)
        return d

00188     def test_current_uploads(self):
        """ Test Status.current_uploads with fake data in the AQ """
        share_path = os.path.join(self.shares_dir, 'share')
        self.main.vm.add_share(Share(share_path, share_id='share_id'))
        up_path = os.path.join(share_path, "up_path")
        self.fs_manager.create(up_path, "share_id")
        self.fs_manager.set_node_id(up_path, "node_id")
        self.action_q.uploading[('share_id', 'node_id')] = dict(
            deflated_size=100, n_bytes_written=10)
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)
        d = defer.Deferred()

        def uploads_handler(result):
            """ reply handler """
            self.assertEquals(1, len(result))
            self.assertEquals(up_path, str(result[0]['path']))
            self.assertEquals('100', str(result[0]['deflated_size']))
            self.assertEquals('10', str(result[0]['n_bytes_written']))
            d.callback(True)

        client.call_method('current_uploads', reply_handler=uploads_handler,
                           error_handler=self.error_handler)
        return d

00212     def test_current_uploads_with_marker(self):
        """ Test Status.current_uploads with fake data in the AQ """
        self.action_q.uploading[('', MDMarker('a_marker_uuid'))] = dict(
            deflated_size=100, n_bytes_written=10)
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)
        d = defer.Deferred()

        def uploads_handler(result):
            """ reply handler """
            self.assertEquals(0, len(result))
            d.callback(True)

        client.call_method('current_uploads', reply_handler=uploads_handler,
                           error_handler=self.error_handler)
        return d

00228     def test_two_current_downloads(self):
        """ Test Status.current_downloads with fake data in the AQ """
        share_path = os.path.join(self.shares_dir, 'share')
        self.main.vm.add_share(Share(share_path, share_id='share_id'))
        down_path = os.path.join(share_path, "down_path")
        self.fs_manager.create(down_path, "share_id")
        self.fs_manager.set_node_id(down_path, "node_id")
        self.action_q.downloading[('share_id', 'node_id')] = dict(
            deflated_size=10, size=100, n_bytes_read=8)
        share1_path = os.path.join(self.shares_dir, 'share1')
        self.main.vm.add_share(Share(share1_path, share_id='share_id_1'))
        down_path_1 = os.path.join(share1_path, "down_path_1")
        self.fs_manager.create(down_path_1, "share_id_1")
        self.fs_manager.set_node_id(down_path_1, "node_id_1")
        self.action_q.downloading[('share_id_1', 'node_id_1')] = dict(
            deflated_size=10, size=100, n_bytes_read=5)
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)
        d = defer.Deferred()
        def downloads_handler(result):
            """ reply handler """
            self.assertEquals(2, len(result))
            self.assertEquals(down_path, str(result[0]['path']))
            self.assertEquals('10', str(result[0]['deflated_size']))
            self.assertEquals('8', str(result[0]['n_bytes_read']))
            self.assertEquals(down_path_1, str(result[1]['path']))
            self.assertEquals('10', str(result[1]['deflated_size']))
            self.assertEquals('5', str(result[1]['n_bytes_read']))
            d.callback(True)

        client.call_method('current_downloads',
                           reply_handler=downloads_handler,
                           error_handler=self.error_handler)
        return d

00262     def test_two_current_uploads(self):
        """ Test Status.current_uploads with fake data in the AQ """
        share_path = os.path.join(self.shares_dir, 'share')
        self.main.vm.add_share(Share(share_path, share_id='share_id'))
        up_path = os.path.join(share_path, "up_path")
        self.fs_manager.create(up_path, "share_id")
        self.fs_manager.set_node_id(up_path, "node_id")
        self.action_q.uploading[('share_id', 'node_id')] = dict(
            deflated_size=100, n_bytes_written=10)
        share1_path = os.path.join(self.shares_dir, 'share1')
        self.main.vm.add_share(Share(share1_path, share_id='share_id_1'))
        up_path_1 = os.path.join(share1_path, "up_path_1")
        self.fs_manager.create(up_path_1, "share_id_1")
        self.fs_manager.set_node_id(up_path_1, "node_id_1")
        self.action_q.uploading[('share_id_1', 'node_id_1')] = dict(
            deflated_size=80, n_bytes_written=20)
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)
        d = defer.Deferred()

        def uploads_handler(result):
            """ reply handler """
            self.assertEquals(2, len(result))
            self.assertEquals(up_path, str(result[0]['path']))
            self.assertEquals('100', str(result[0]['deflated_size']))
            self.assertEquals('10', str(result[0]['n_bytes_written']))
            self.assertEquals(up_path_1, str(result[1]['path']))
            self.assertEquals('80', str(result[1]['deflated_size']))
            self.assertEquals('20', str(result[1]['n_bytes_written']))
            d.callback(True)

        client.call_method('current_uploads', reply_handler=uploads_handler,
                           error_handler=self.error_handler)
        return d

00296     def test_nm_signals(self):
        """ Test that NM signals are received and handled properly. """
        result = None
        d = defer.Deferred()
        # helper class, pylint: disable-msg=C0111
        # class-closure, cannot use self, pylint: disable-msg=E0213
        class Listener(object):
            def handle_SYS_NET_CONNECTED(innerself):
                self.nm.emit_disconnected()
            def handle_SYS_NET_DISCONNECTED(innerself):
                self.assertTrue(result)

        listener = Listener()
        self.event_q.subscribe(listener)

        def empty_queue_callback():
            d.callback(True)

        def callback(result):
            self.event_q.unsubscribe(listener)
            self.event_q.remove_empty_event_queue_callback(empty_queue_callback)
        d.addCallback(callback)

        self.nm.emit_connected()
        self.event_q.add_empty_event_queue_callback(empty_queue_callback)
        return d

00323     def test_get_shares(self):
        """ Test Shares.get_shares method"""
        share_path = os.path.join(self.main.shares_dir, 'share')
        self.main.vm.add_share(Share(path=share_path, share_id='share_id',
                                     access_level='Read', accepted=False))
        client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        def shares_handler(shares):
            """ handle get_shares reply """
            self.assertEquals(1, len(shares))
            for share in shares:
                if share['id'] == '':
                    self.assertEquals('', str(share['id']))
                    self.assertEquals(self.root_dir, str(share['path']))
                    self.assertEquals('Modify', str(share['access_level']))
                    self.assertEquals('False', str(share['accepted']))
                else:
                    self.assertEquals('share_id', str(share['id']))
                    self.assertEquals(share_path, str(share['path']))
                    self.assertEquals('Read', str(share['access_level']))
                    self.assertEquals('False', str(share['accepted']))
            d.callback(True)

        client.call_method('get_shares', reply_handler=shares_handler,
                           error_handler=self.error_handler)
        return d

00350     def test_accept_share(self):
        """ Test the accept_share method in dbus_interface.Share """
        share_path = os.path.join(self.main.shares_dir, 'share')
        self.main.vm.add_share(Share(path=share_path, share_id='share_id',
                                     access_level='Read', accepted=False,
                                     subtree="node_id"))
        self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
        client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)

        d = defer.Deferred()
        def reply_handler(result):
            """ handle get_shares reply """
            self.assertEquals(None, result)
            self.assertEquals(True, self.main.vm.shares['share_id'].accepted)
            d.callback(True)

        client.call_method('accept_share', 'share_id',
                           reply_handler=reply_handler,
                           error_handler=self.error_handler)
        return d

00371     def test_reject_share(self):
        """ Test the reject_share method in dbus_interface.Share """
        share_path = os.path.join(self.main.shares_dir, 'share')
        self.main.vm.add_share(Share(path=share_path, share_id='share_id',
                                     access_level='Read', accepted=False))
        self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
        client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)

        d = defer.Deferred()
        def reply_handler(result):
            """ handle get_shares reply """
            self.assertEquals(None, result)
            self.assertEquals(False, self.main.vm.shares['share_id'].accepted)
            d.callback(True)

        client.call_method('reject_share', 'share_id',
                           reply_handler=reply_handler,
                           error_handler=self.error_handler)
        return d

00391     def test_get_root(self):
        """ Check SycnDaemon.get_root exposed method """
        client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
        d = defer.Deferred()
        def reply_handler(result):
            """ handle get_rootdir reply """
            current_root = os.path.dirname(self.main.root_dir)
            self.assertEquals(current_root, str(result))
            d.callback(True)
        client.call_method('get_rootdir',
                           reply_handler=reply_handler,
                           error_handler=self.error_handler)
        return d

00405     def test_create_share(self):
        """ Test share offering """
        a_dir = os.path.join(self.root_dir, "a_dir")
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        # helper functions, pylint: disable-msg=C0111
        def fake_create_share(*result):
            node_id, username, name, access_level, marker = result
            self.assertEquals('node_id', node_id)
            mdobj = self.fs_manager.get_by_path(a_dir)
            self.assertEquals(self.fs_manager.get_abspath("", mdobj.path),
                              os.path.join(self.main.root_dir, a_dir))
            self.assertEquals(u'test_user', username)
            self.assertEquals(u'share_a_dir', name)
            self.assertEquals('View', access_level)

        self.action_q.create_share = fake_create_share

        def reply_handler(result):
            d.callback(result)

        client.call_method('create_share',
                           a_dir, 'test_user',
                           'share_a_dir', 'View',
                           reply_handler=reply_handler,
                           error_handler=self.error_handler)
        return d

00435     def test_query_by_path(self):
        """ test that query_by_path method work as expected. """
        a_dir = os.path.join(self.root_dir, "a_dir")
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
        d = defer.Deferred()
        # helper functions, pylint: disable-msg=C0111
        def query(items):
            self.assertEquals(1, len(items))
            self.assertEquals(('', 'node_id', ''), items[0])
        self.action_q.query = query
        client.call_method('query_by_path', a_dir,
                           reply_handler=d.callback,
                           error_handler=self.error_handler)
        return d

00452     def test_get_shared(self):
        """ Test Shares.get_shared API. """
        a_dir = os.path.join(self.root_dir, "a_dir")
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        # helper functions, pylint: disable-msg=C0111
        def aq_create_share(*args):
            self.main.event_q.push('AQ_CREATE_SHARE_OK', 'share_id', args[-1])
        self.main.action_q.create_share = aq_create_share
        self.main.vm.create_share(a_dir, 0, 'share_a_dir', 'View')
        client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        def reply_handler(results):
            """ handle get_shared reply """
            self.assertEquals(1, len(results))
            shared = results[0]
            self.assertEquals(a_dir, str(shared['path']))
            self.assertEquals('node_id', str(shared['subtree']))
            self.assertEquals('share_id', str(shared['id']))
            self.assertEquals('View', str(shared['access_level']))
            d.callback(True)
        client.call_method('get_shared',
                           reply_handler=reply_handler,
                           error_handler=self.error_handler)
        return d

00478     def test_get_shared_missing_path(self):
        """ Test Shares.get_shared, but without having the path. """
        a_dir = os.path.join(self.root_dir, "a_dir")
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        # helper functions, pylint: disable-msg=C0111
        def aq_create_share(*args):
            self.main.event_q.push('AQ_CREATE_SHARE_OK', 'share_id', args[-1])
        self.main.action_q.create_share = aq_create_share
        self.main.vm.create_share(a_dir, 0, 'share_a_dir', 'View')
        client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        # remove the md of the subtree from fsm
        self.fs_manager.delete_file(a_dir)
        # set the path to None
        share = self.main.vm.shared['share_id']
        share.path = None
        self.main.vm.shared['share_id'] = share
        d = defer.Deferred()
        def reply_handler(results):
            """ handle get_shared reply """
            self.assertEquals(1, len(results))
            shared = results[0]
            self.assertEquals('', str(shared['path']))
            self.assertEquals('node_id', str(shared['subtree']))
            self.assertEquals('share_id', str(shared['id']))
            self.assertEquals('View', str(shared['access_level']))
            d.callback(True)
        client.call_method('get_shared',
                           reply_handler=reply_handler,
                           error_handler=self.error_handler)
        return d
00509     def test_refresh_shares(self):
        """ Just check that refresh_shares method API works. """
        client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        def reply_handler(result):
            """ handle get_rootdir reply """
            d.callback(True)
        client.call_method('refresh_shares',
                           reply_handler=reply_handler,
                           error_handler=self.error_handler)
        return d

00521     def test_quit(self):
        """ test the quit exposed method. """
        client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
        d = defer.Deferred()
        # helper functions, we need to call quit but don't quit
        # pylint: disable-msg=C0111,W0601,W0602
        def fake_quit():
            pass
        self.main.quit = fake_quit
        def handler(result):
            d.callback(True)
        d.addCallback(lambda result: self.assertTrue(result))
        client.call_method('quit',
                           reply_handler=handler,
                           error_handler=self.error_handler)
        return d


00539 class DBusInterfaceUnicodeTests(DBusTwistedTestCase):
    """ Unicode variant of basic tests to the objects exposed with D-Bus"""

00542     def test_filesystem_unicode(self):
        """ Test the FileSystem Object, registering it to the session bus, and
        excercising the API.
        """
        share_path = os.path.join(self.shares_dir, 'share')
        self.main.vm.add_share(Share(share_path, share_id='share'))
        path = os.path.join(share_path, u'ñoño'.encode('utf-8'))
        self.fs_manager.create(path, "share")
        self.fs_manager.set_node_id(path, "node_id")
        d = defer.Deferred()
        def handler(metadata):
            """ reply handler """
            d.callback(metadata)

        def callback(result):
            """ callback to check the result. """
            self.assertEquals(u'\xf1o\xf1o', unicode(result['path']))
            self.assertEquals('share', result['share_id'])
            self.assertEquals('node_id', result['node_id'])

        d.addCallback(callback)
        client = DBusClient(self.bus, '/filesystem', DBUS_IFACE_FS_NAME)
        client.call_method('get_metadata', path, reply_handler=handler,
                           error_handler=self.error_handler)
        return d

00568     def test_query_by_path_unicode(self):
        """ test that query_by_path method work as expected. """
        a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        client = DBusClient(self.bus, '/', DBUS_IFACE_SYNC_NAME)
        d = defer.Deferred()
        # helper functions, pylint: disable-msg=C0111
        def query(items):
            self.assertEquals(1, len(items))
            self.assertEquals(('', 'node_id', ''), items[0])
        self.action_q.query = query
        client.call_method('query_by_path', a_dir,
                           reply_handler=d.callback,
                           error_handler=self.error_handler)
        return d

00585     def test_create_share_unicode(self):
        """ Test share offering """
        a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()
        # helper functions, pylint: disable-msg=C0111
        def fake_create_share(*result):
            node_id, username, name, access_level, marker = result
            self.assertEquals('node_id', node_id)
            mdobj = self.fs_manager.get_by_path(a_dir)
            self.assertEquals(self.fs_manager.get_abspath("", mdobj.path),
                              os.path.join(self.root_dir, a_dir))
            self.assertEquals(u'test_user', username)
            self.assertEquals(u'share_ñoño', name)
            self.assertEquals('View', access_level)

        self.action_q.create_share = fake_create_share

        def reply_handler(result):
            d.callback(result)

        client.call_method('create_share',
                           a_dir, u'test_user',
                           u'share_ñoño', 'View',
                           reply_handler=reply_handler,
                           error_handler=self.error_handler)
        return d

00615     def test_get_shared_unicode(self):
        """ test that list_shared method behaves with unicode data """
        a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        share = Share(a_dir, share_id='shared_id', name=u'ñoño_shared',
                      access_level='View', other_username=u'test_username',
                      subtree='node_id')
        self.main.vm.add_shared(share)
        client = DBusClient(self.bus, '/shares', DBUS_IFACE_SHARES_NAME)
        d = defer.Deferred()

        def check(results):
            """ check the returned values. """
            self.assertEquals(1, len(results))
            shared = results[0]
            self.assertEquals(a_dir, shared['path'].encode('utf-8'))
            self.assertEquals('node_id', str(shared['subtree']))
            self.assertEquals('shared_id', str(shared['id']))
            self.assertEquals('View', str(shared['access_level']))

        d.addCallback(check)
        client.call_method('get_shared',
                           reply_handler=d.callback, error_handler=d.errback)
        return d


00642 class DBusSignalTest(DBusTwistedTestCase):
    """ Test the DBus signals. """

00645     def test_event_signal(self):
        """ Test the Event signal. """
        client = DBusClient(self.bus, '/events', DBUS_IFACE_EVENTS_NAME)
        d = defer.Deferred()
        def handler(event_dict):
            """ signal handler """
            self.assertEquals({'path':'foo', 'name':'FS_FILE_CREATE'},
                              event_dict)
            d.callback(True)

        match = self.bus.add_signal_receiver(handler, signal_name='Event')
        self.signal_receivers.add(match)
        client.send_signal('Event', {'name':'FS_FILE_CREATE', 'path':'foo'})

        return d

00661     def test_download_started(self):
        """ Test the DBus signals in Status """
        a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)
        d = defer.Deferred()
        def download_handler(path):
            """ handler for DownloadStarted signal. """
            self.assertEquals(a_dir, path.encode('utf-8'))
            d.callback(True)

        match = self.bus.add_signal_receiver(download_handler,
                                     signal_name='DownloadStarted')
        self.signal_receivers.add(match)
        self.main.event_q.push('AQ_DOWNLOAD_STARTED', '', 'node_id', '')
        return d

00679     def test_download_finished(self):
        """ Test the DBus signals in Status """
        a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)
        d = defer.Deferred()
        def download_handler(path):
            """ handler for DownloadFinished signal. """
            self.assertEquals(a_dir, path.encode('utf-8'))
            d.callback(True)

        match = self.bus.add_signal_receiver(download_handler,
                                     signal_name='DownloadFinished')
        self.signal_receivers.add(match)
        self.main.event_q.push('AQ_DOWNLOAD_FINISHED', '', 'node_id', '')
        return d

00697     def test_upload_started(self):
        """ Test the DBus signals in Status """
        a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)

        d = defer.Deferred()
        def upload_handler(path):
            """ handler for UploadStarted signal. """
            self.assertEquals(a_dir, path.encode('utf-8'))
            d.callback(True)

        match = self.bus.add_signal_receiver(upload_handler,
                                     signal_name='UploadStarted')
        self.signal_receivers.add(match)
        self.main.event_q.push('AQ_UPLOAD_STARTED', '', 'node_id', '')
        return d

00716     def test_upload_finished(self):
        """ Test the DBus signals in Status """
        a_dir = os.path.join(self.root_dir, u'ñoño'.encode('utf-8'))
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)

        d = defer.Deferred()
        def upload_handler(path):
            """ handler for UploadFinished signal. """
            self.assertEquals(a_dir, path.encode('utf-8'))
            d.callback(True)

        match = self.bus.add_signal_receiver(upload_handler,
                                     signal_name='UploadFinished')
        self.signal_receivers.add(match)
        self.main.event_q.push('AQ_UPLOAD_FINISHED', '', 'node_id', '')
        return d

00735     def test_status_changed(self):
        """ Test the DBus signals in Status """
        client = DBusClient(self.bus, '/status', DBUS_IFACE_STATUS_NAME)
        def start(result):
            """ start the test """
            match = self.bus.add_signal_receiver(status_handler,
                                     signal_name='StatusChanged')
            self.signal_receivers.add(match)
            bool_str = self.dbus_iface.status.bool_str
            state = states.IDLE
            state_dict = {'name':state.name,
                          'description':state.description,
                          'is_error':bool_str(state.is_error),
                          'is_connected':bool_str(state.is_connected),
                          'is_online':bool_str(state.is_online)}
            client.send_signal('StatusChanged', state_dict)
        ready = self.main.wait_for_nirvana()
        ready.addCallback(start)
        d = defer.Deferred()
        def status_handler(result):
            """ handler for StatusChanged signal. """
            self.log.debug('status_handler, received: %r' % result)
            state = states.IDLE
            self.assertEquals(state.name, result['name'])
            self.assertEquals(state.description, result['description'])
            self.assertEquals(state.is_error, bool(result['is_error']))
            self.assertEquals(state.is_connected, bool(result['is_connected']))
            self.assertEquals(state.is_online, bool(result['is_online']))
            d.callback(True)

        return d

00767     def test_all_events_signal(self):
        """ Tests that we are listening all events """
        # create the required items in the fs manager
        share_path = os.path.join(self.shares_dir, 'share')
        self.main.vm.add_share(Share(share_path, share_id='share_id'))
        a_path = os.path.join(share_path, "a_path")
        self.fs_manager.create(a_path, "share_id")
        self.fs_manager.set_node_id(a_path, "node_id")
        event_names = sorted(event_queue.EVENTS.keys())
        events_copy = event_queue.EVENTS.copy()
        d = defer.Deferred()
        def event_handler(event_dict):
            """ event handler that pop the received event from a list """
            event_args = events_copy.pop(str(event_dict.pop('event_name')))
            self.assertEquals(len(event_args), len(event_dict))
            if len(events_copy) == 0:
                cleanup(None)
                d.callback(True)

        def callback(result):
            """ default callback """
            return result

        def errback(error):
            """ error callback """
            missed_events = sorted(events_copy.keys())
            if missed_events:
                self.fail('Missed: ' + str(missed_events))
            else:
                return error

        d.addCallbacks(callback, errback)

        def cleanup(result):
            """ remove the signal receiver """
            self.event_q.subscribe(self.main.vm)
        d.addCallback(cleanup)
        # unsubscribe the vm subscribed in FakeMain as we are going to push
        # fake events
        self.event_q.unsubscribe(self.main.vm)
        match = self.bus.add_signal_receiver(event_handler, signal_name='Event')
        self.signal_receivers.add(match)
        for event_name in event_names:
            args = event_queue.EVENTS[event_name]
            self.event_q.push(event_name, *args)

        return d
    test_all_events_signal.skip = "This test fire fake events that AQ, FSM" \
                                  " and VM can't handle"

00817     def test_share_changed(self):
        """ Test the ShareChanged signal. """
        share_path = os.path.join(self.main.shares_dir, 'share')
        self.main.vm.add_share(Share(path=share_path, share_id='a_share_id',
                                     access_level='Read', accepted=False))
        share_holder = NotifyShareHolder.from_params('a_share_id', 'subtree',
                                                     u'fake_share',
                                                     u'test_username',
                                                     u'visible_name', 'Write')

        d = defer.Deferred()
        def share_handler(share):
            """ handler for ShareChanged signal. """
            self.assertEquals('a_share_id', str(share['id']))
            self.assertEquals(share_path, str(share['path']))
            self.assertEquals('Write', str(share['access_level']))
            self.assertEquals('False', str(share['accepted']))
            d.callback(True)

        match = self.bus.add_signal_receiver(share_handler,
                                     signal_name='ShareChanged')
        self.signal_receivers.add(match)
        self.main.event_q.push('SV_SHARE_CHANGED', 'changed', share_holder)
        return d

00842     def test_share_deleted(self):
        """ Test the ShareDeleted signal. """
        share_path = os.path.join(self.main.shares_dir, 'share')
        self.main.vm.add_share(Share(path=share_path, share_id='a_share_id',
                                     access_level='Read', accepted=False))
        share_holder = NotifyShareHolder.from_params('a_share_id', 'subtree',
                                                     u'fake_share',
                                                     u'test_username',
                                                     u'visible_name', 'Read')

        d = defer.Deferred()
        def share_handler(share_dict):
            """ handler for ShareDeletedsignal. """
            expected_dict = dict(share_id='a_share_id',
                                 subtree='subtree',
                                 share_name=u'fake_share',
                                 from_username=u'test_username',
                                 from_visible_name=u'visible_name',
                                 access_level='Read')
            for k, v in share_dict.items():
                self.assertEquals(expected_dict[str(k)], str(v))
            d.callback(True)

        match = self.bus.add_signal_receiver(share_handler,
                                     signal_name='ShareDeleted')
        self.signal_receivers.add(match)

        self.main.event_q.push('SV_SHARE_CHANGED', 'deleted', share_holder)
        return d

00872     def test_share_created(self):
        """ Test the ShareCreated signal. """
        a_dir = os.path.join(self.root_dir, "a_dir")
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        mdid = self.fs_manager.get_by_node_id("", "node_id")
        marker = MDMarker(mdid)
        d = defer.Deferred()
        def share_handler(result):
            """ handler for ShareCreated signal. """
            self.assertEquals('share_id', str(result['share_id']))
            d.callback(True)

        match = self.bus.add_signal_receiver(share_handler,
                                     signal_name='ShareCreated')
        self.signal_receivers.add(match)
        self.main.event_q.push('AQ_CREATE_SHARE_OK', 'share_id', marker)
        return d

00891     def test_share_created_error(self):
        """ Test the ShareCreated signal. """
        a_dir = os.path.join(self.root_dir, "a_dir")
        self.fs_manager.create(a_dir, "", is_dir=True)
        self.fs_manager.set_node_id(a_dir, "node_id")
        mdid = self.fs_manager.get_by_node_id("", "node_id")
        marker = MDMarker(mdid)
        error_msg = 'a error message'
        d = defer.Deferred()
        def share_handler(info, error):
            """ handler for ShareCreated signal. """
            self.assertEquals(str(marker), str(info['marker']))
            self.assertEquals(error, error_msg)
            d.callback(True)

        match = self.bus.add_signal_receiver(share_handler,
                                     signal_name='ShareCreateError')
        self.signal_receivers.add(match)
        self.main.event_q.push('AQ_CREATE_SHARE_ERROR', marker, error_msg)
        return d

00912     def test_signal_error(self):
        """ test sending SignalError. """
        d = defer.Deferred()
        def error_handler(type, args):
            """ Error signal handler. """
            self.assertEquals('node_id', args['node_id'].decode('utf-8'))
            self.assertEquals('', args['share_id'].decode('utf-8'))
            d.callback(True)

        error_match = self.bus.add_signal_receiver(error_handler,
                                     signal_name='SignalError',
                                     dbus_interface=DBUS_IFACE_SYNC_NAME)
        self.signal_receivers.add(error_match)
        # force a invalid AQ_UPLOAD_FINISHED
        self.main.event_q.push('AQ_UPLOAD_FINISHED', '', 'node_id', 'hash')
        return d


def test_suite():
    # pylint: disable-msg=C0111
    return unittest.TestLoader().loadTestsFromName(__name__)

Generated by  Doxygen 1.6.0   Back to index