Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

test_tools.py

#
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
"""tests for the syncdaemon tools module """
import os

from ubuntuone.syncdaemon import (
    event_queue,
    tools,
    volume_manager,
    states,
)
from contrib.testing.testcase import (
    DBusTwistedTestCase,
)
from twisted.internet import reactor


00032 class TestToolsBasic(DBusTwistedTestCase):
    """ Baic test of SyncDaemonTool """

    def setUp(self):
        DBusTwistedTestCase.setUp(self)
        self.tool = tools.SyncDaemonTool(self.bus)

00039     def create_file(self, path):
        """ creates a test file in fsm """
        share_path = os.path.join(self.shares_dir, 'share_tools')
        self.main.vm.add_share(volume_manager.Share(share_path,
                                                    share_id='tools_share_id'))
        self.fs_manager.create(path, "tools_share_id")
        self.fs_manager.set_node_id(path, "node_id")
        return 'tools_share_id', 'node_id'

00048     def test_wait_connected(self):
        """ test wait_connected """
        self.action_q.connect()
        d = self.tool.wait_connected()
        # test callback, pylint: disable-msg=C0111
        def connected(result):
            self.assertEquals(True, result)
        d.addBoth(connected)
        return d

00058     def test_wait_no_more_events(self):
        """ test wait_no_more_events """
        d = self.tool.wait_no_more_events(last_event_interval=.1)
        # test callback, pylint: disable-msg=C0111
        event_names = event_queue.EVENTS.keys()
        def events(result):
            self.assertEquals(True, result)

        def fire_events():
            # unsubscribe the vm subscribed in FakeMain as we are going to push
            # fake events
            self.event_q.unsubscribe(self.main.vm)
            for event_name in event_names:
                args = event_queue.EVENTS[event_name]
                self.event_q.push(event_name, *args)

        reactor.callLater(0, self.action_q.connect)
        reactor.callLater(0, fire_events)
        d.addBoth(events)
        d.addCallback(lambda _: self.event_q.subscribe(self.main.vm))
        return d

00080     def test_all_downloads(self):
        """ test wait_all_downloads """
        d = self.tool.wait_all_downloads()

        # test callback, pylint: disable-msg=C0111
        def downloads(result):
            self.assertEquals(True, result)
        d.addBoth(downloads)
        return d

00090     def test_all_uploads(self):
        """ test wait_all_uploads """
        d = self.tool.wait_all_uploads()

        # test callback, pylint: disable-msg=C0111
        def uploads(result):
            self.assertEquals(True, result)
        d.addBoth(uploads)
        return d

00100     def test_wait_for_nirvana(self):
        """ test wait_for_nirvana """
        self.main.state = states.IDLE
        d = self.tool.wait_for_nirvana(last_event_interval=.1)

        # test callback, pylint: disable-msg=C0111
        def callback(result):
            self.assertEquals(True, result)
        d.addBoth(callback)
        # clear downloading
        reactor.callLater(0, self.action_q.connect)
        event_names = event_queue.EVENTS.keys()
        def fire_events():
            # unsubscribe the vm subscribed in FakeMain as we are going to push
            # fake events
            self.event_q.unsubscribe(self.main.vm)
            for event_name in event_names:
                args = event_queue.EVENTS[event_name]
                self.event_q.push(event_name, *args)

        # fire fake events to keep the deferred running
        reactor.callLater(0, fire_events)
        # 1 sec later, clear the download queue, and wait to reach nirvana
        d.addCallback(lambda _: self.event_q.subscribe(self.main.vm))
        return d

00126     def test_get_metadata(self):
        """ check that get_metadata works as expected """
        path = os.path.join(self.root_dir, "foo")
        self.fs_manager.create(path, "")
        self.fs_manager.set_node_id(path, "node_id")
        d = self.tool.get_metadata(path)
        # the callback, pylint: disable-msg=C0111
        def callback(result):
            self.assertEquals('foo', str(result['path']))
            self.assertEquals('', str(result['share_id']))
            self.assertEquals('node_id', result['node_id'])
        d.addCallback(callback)
        return d

Generated by  Doxygen 1.6.0   Back to index