Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

test_localrescan.py

#
# Author: Facundo Batista <facundo@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

'''Tests for the Event Queue.'''

from __future__ import with_statement

import unittest
import os
import shutil

from contrib.testing import testcase
from ubuntuone.syncdaemon.local_rescan import LocalRescan
from ubuntuone.syncdaemon import event_queue
from ubuntuone.syncdaemon import filesystem_manager
from ubuntuone.syncdaemon.volume_manager import Share
from ubuntuone.storageprotocol.hash import content_hash_factory

from twisted.trial.unittest import TestCase as TwistedTestCase
from twisted.internet import defer, reactor

TESTS_DIR = os.path.join(os.getcwd(), "tmp")
if not os.path.exists(TESTS_DIR):
    os.makedirs(TESTS_DIR)

00040 class FakeEQ(object):
    '''Fake EQ'''
    def __init__(self):
        self.pushed = []

00045     def push(self, event, path):
        '''Store stuff as pushed.'''
        self.pushed.append((event, path))

00049     def freeze_begin(self, *a):
        '''fake'''
    inotify_add_watch = freeze_rollback = freeze_begin

00053     def freeze_commit(self, events):
        '''just store events'''
        self.pushed.extend(events)

00057 class BaseTestCase(unittest.TestCase):
    """ Base test case """

00060     def setUp(self):
        """ Setup the test """
        try:
            os.mkdir(TESTS_DIR)
        except OSError:
            # already there, remove it to clean and create again
            shutil.rmtree(TESTS_DIR)
            os.mkdir(TESTS_DIR)
        self.shares_dir = os.path.join(TESTS_DIR, 'shares')
        os.makedirs(self.shares_dir)
        self.fsmdir = os.path.join(TESTS_DIR, "fsmdir")
        self.vm = testcase.FakeVolumeManager(os.path.join(TESTS_DIR, "usrdir"))
        self.fsm = filesystem_manager.FileSystemManager(self.fsmdir, self.vm)
        self.eq = FakeEQ()

00075     def tearDown(self):
        """ Clean up the tests. """
        shutil.rmtree(TESTS_DIR)

    @staticmethod
00080     def create_share(share_id, share_name, fsm, shares_dir,
                     access_level='Modify'):
        """ creates a share """
        share_path = os.path.join(shares_dir, share_name)
        os.makedirs(share_path)
        share = Share(share_path, share_id=share_id,
                      access_level=access_level)
        fsm.vm.add_share(share)
        return share

00090     def create_node(self, path, is_dir, real=True, which_share=None):
        '''Creates a node, really (maybe) and in the metadata.'''
        if which_share is None:
            which_share = self.share
        filepath = os.path.join(which_share.path, path)
        if real:
            if is_dir:
                os.mkdir(filepath)
            else:
                open(filepath, "w").close()

        self.fsm.create(filepath, which_share.id, is_dir=is_dir)
        self.fsm.set_node_id(filepath, "uuid"+path)
        return filepath


00106 class CollectionTests(BaseTestCase):
    '''Test to check how LocalRescan gathers the dirs to scan.'''

00109     def test_init(self):
        '''Test the init params.'''
        self.assertRaises(TypeError, LocalRescan, 1)
        self.assertRaises(TypeError, LocalRescan, 1, 2)
        self.assertRaises(TypeError, LocalRescan, 1, 2, 3, 4)
        lr = LocalRescan(self.vm, self.fsm, self.eq)

00116     def test_empty_ro(self):
        '''Test with one empty View share.'''
        # create the share

        lr = LocalRescan(self.vm, self.fsm, self.eq)

        toscan = []
        def f():
            '''helper'''
            toscan.extend(x[1] for x in lr._queue)
        lr._queue_scan = f
        lr.start()
        self.assertEqual(toscan, [self.vm.root.path])

00130     def test_empty_rw(self):
        '''Test with one empty Modify share.'''
        # create the share
        share = self.create_share('share_id', 'ro_share',  self.fsm,
                                       self.shares_dir, access_level='Modify')
        mdid = self.fsm.create(share.path, "share_id", is_dir=True)
        self.fsm.set_node_id(share.path, "uuid")

        lr = LocalRescan(self.vm, self.fsm, self.eq)

        toscan = []
        def f():
            '''helper'''
            toscan.extend(x[1] for x in lr._queue)
        lr._queue_scan = f
        lr.start()
        self.assertEqual(sorted(toscan), [share.path, self.vm.root.path])

00148     def test_not_empty_rw(self):
        '''Test with a Modify share with info.'''
        # create the share
        share = self.create_share('share_id', 'ro_share',  self.fsm,
                                       self.shares_dir, access_level='Modify')
        mdid = self.fsm.create(share.path, "share_id", is_dir=True)
        self.fsm.set_node_id(share.path, "uuid1")

        # create a node in the share
        filepath = os.path.join(share.path, "a")
        mdid = self.fsm.create(filepath, "share_id")
        self.fsm.set_node_id(filepath, "uuid2")
        open(filepath, "w").close()

        lr = LocalRescan(self.vm, self.fsm, self.eq)

        toscan = []
        def f():
            '''helper'''
            toscan.extend(x[1] for x in lr._queue)
        lr._queue_scan = f
        lr.start()
        self.assertEqual(sorted(toscan), [share.path, self.vm.root.path])


00173 class TwistedBase(TwistedTestCase, BaseTestCase):
    '''Base class for twisted tests.'''

    timeout = 2

00178     def setUp(self):
        '''set up the test.'''
        BaseTestCase.setUp(self)
        self.deferred = defer.Deferred()
        self.lr = LocalRescan(self.vm, self.fsm, self.eq)

        # create a share
        self.share = self.create_share('share_id', 'ro_share',  self.fsm,
                                       self.shares_dir, access_level='Modify')
        self.fsm.create(self.share.path, "share_id", is_dir=True)
        self.fsm.set_node_id(self.share.path, "uuidshare")

00190     def startTest(self, check_function):
        '''starts the test.'''
        self.deferred.addCallback(lambda x: self.lr.start())
        self.deferred.addCallback(check_function)
        self.deferred.callback(None)


00197 class ComparationTests(TwistedBase):
    '''Test LocalRescan checking differences between disk and metadata.'''

    timeout = 20

00202     def test_empty(self):
        '''Test with an empty share.'''
        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [])

        self.startTest(check)
        return self.deferred

00211     def test_equals(self):
        '''Test with a share with the same files as metadata.'''
        # create a node in the share
        self.create_node("a", is_dir=False)

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [])

        self.startTest(check)
        return self.deferred

00223     def test_disc_more_file_empty(self):
        '''Test having an empty file more in disc than in metadata.'''
        # create a node in the share
        self.create_node("a", is_dir=False)

        # and another file in disk
        otherpath = os.path.join(self.share.path, "b")
        open(otherpath, "w").close()

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_FILE_CREATE', otherpath)])

        self.startTest(check)
        return self.deferred

00239     def test_disc_more_file_content(self):
        '''Test having a file (with content) more in disc than in metadata.'''
        # create a node in the share
        self.create_node("a", is_dir=False)

        # and another file in disk
        path = os.path.join(self.share.path, "b")
        with open(path, "w") as fh:
            fh.write("foo")

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_FILE_CREATE', path),
                                              ('FS_FILE_CLOSE_WRITE', path)])

        self.startTest(check)
        return self.deferred

00257     def test_disc_symlink(self):
        '''Test having a symlink in disc.'''
        # create a node in the share
        source = self.create_node("a", is_dir=False)

        # and a symlink to ignore!
        symlpath = os.path.join(self.share.path, "b")
        os.symlink(source, symlpath)

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [])

        self.startTest(check)
        return self.deferred

00273     def test_disc_more_dir(self):
        '''Test having a dir more in disc than in metadata.'''
        # create a node in the share
        self.create_node("a", is_dir=False)

        # and another dir in disk
        otherpath = os.path.join(self.share.path, "b")
        os.mkdir(otherpath)

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_DIR_CREATE', otherpath)])

        self.startTest(check)
        return self.deferred

00289     def test_disc_less_file(self):
        '''Test having less in disc than in metadata.'''
        # create a node in the share, but no in disk
        filepath = self.create_node("a", is_dir=False, real=False)

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_FILE_DELETE', filepath)])

        self.startTest(check)
        return self.deferred

00301     def test_disc_less_dir(self):
        '''Test having less in disc than in metadata.'''
        # create a node in the share, but no in disk
        filepath = self.create_node("a", is_dir=True, real=False)

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_DIR_DELETE', filepath)])

        self.startTest(check)
        return self.deferred

00313     def test_differenttype_dir(self):
        '''Test that it should be a dir but now it's a file.'''
        # create one type in the share, other in disk
        thispath = self.create_node("a", is_dir=True, real=False)
        open(thispath, "w").close()

        def check(_):
            '''checks all is ok.'''
            # don't sort, as the DELETE must come before the CREATE
            events = self.eq.pushed
            self.assertEqual(events[0], ('FS_DIR_DELETE', thispath))
            self.assertEqual(events[1], ('FS_FILE_CREATE', thispath))

        self.startTest(check)
        return self.deferred

00329     def test_differenttype_file(self):
        '''Test that it should be a file but now it's a dir.'''
        # create one type in the share, other in disk
        thispath = self.create_node("a", is_dir=False, real=False)
        os.mkdir(thispath)

        def check(_):
            '''checks all is ok.'''
            # don't sort, as the DELETE must come before the CREATE
            events = self.eq.pushed
            self.assertEqual(events[0], ('FS_FILE_DELETE', thispath))
            self.assertEqual(events[1], ('FS_DIR_CREATE', thispath))

        self.startTest(check)
        return self.deferred

00345     def test_complex_scenario(self):
        '''Several dirs, several files, some differences.'''
        self.create_node("a", is_dir=True)
        self.create_node("a/b", is_dir=True)
        sh1 = self.create_node("a/b/e", is_dir=True, real=False)
        self.create_node("a/c", is_dir=True)
        self.create_node("a/c/d", is_dir=False)
        sh2 = self.create_node("a/c/e", is_dir=False, real=False)
        self.create_node("d", is_dir=True)
        sh3 = self.create_node("e", is_dir=False, real=False)
        sh4 = self.create_node("f", is_dir=True, real=False)
        sh5 = os.path.join(self.share.path, "j")
        open(sh5, "w").close()
        sh6 = os.path.join(self.share.path, "a", "c", "q")
        open(sh6, "w").close()
        sh7 = os.path.join(self.share.path, "k")
        os.mkdir(sh7)
        sh8 = os.path.join(self.share.path, "a", "p")
        os.mkdir(sh8)

        # scan!
        def check(_):
            '''check'''
            self.assertEqual(sorted(self.eq.pushed), [
                ('FS_DIR_CREATE', sh8),
                ('FS_DIR_CREATE', sh7),
                ('FS_DIR_DELETE', sh1),
                ('FS_DIR_DELETE', sh4),
                ('FS_FILE_CREATE', sh6),
                ('FS_FILE_CREATE', sh5),
                ('FS_FILE_DELETE', sh2),
                ('FS_FILE_DELETE', sh3),
            ])

        self.startTest(check)
        return self.deferred

00382     def test_deep_and_wide(self):
        '''Lot of files in a dir, and lots of dirs.'''
        # almost all known, to force the system to go deep
        dirs = "abcdefghijklmnopq" * 20
        for i in range(1, len(dirs)+1):
            dirpath = os.path.join(*dirs[:i])
            self.create_node(dirpath, is_dir=True)
        basedir = os.path.join(*dirs)
        self.create_node(os.path.join(basedir, "file1"), is_dir=False)
        path = os.path.join(basedir, "file2")
        sh1 = self.create_node(path, is_dir=False, real=False)

        # some files in some dirs
        files = "rstuvwxyz"
        for f in files:
            path = os.path.join(*dirs[:3]+f)
            self.create_node(path, is_dir=False)
            path = os.path.join(*dirs[:6]+f)
            self.create_node(path, is_dir=False)
        sh2 = os.path.join(self.share.path, *dirs[:6]+"q")
        open(sh2, "w").close()

        def check(_):
            '''check'''
            self.assertEqual(sorted(self.eq.pushed), [
                ('FS_FILE_CREATE', sh2),
                ('FS_FILE_DELETE', sh1),
            ])

        # scan!
        self.startTest(check)
        return self.deferred

00415     def test_subtree_removal(self):
        '''A whole subtree was removed.'''
        self.create_node("a", is_dir=True)
        sh1 = self.create_node("a/b", is_dir=True)
        sh2 = self.create_node("a/b/c", is_dir=True)
        sh3 = self.create_node("a/b/c/d", is_dir=False)

        # remove the whole subtree
        shutil.rmtree(sh1)

        # scan!
        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [
                ('FS_FILE_DELETE', sh3),
                ('FS_DIR_DELETE', sh2),
                ('FS_DIR_DELETE', sh1),
            ])

        self.startTest(check)
        return self.deferred

00437     def test_one_dir_only(self):
        '''Specific subtree only.'''
        self.create_node("a", is_dir=True)
        self.create_node("a/b", is_dir=True)

        # one in both, one only in share, one only in disk
        j = self.create_node("a/b/c", is_dir=True)
        sh1 = self.create_node("a/b/d", is_dir=True, real=False)
        sh2 = os.path.join(self.share.path, "a", "b", "e")
        open(sh2, "w").close()

        # more differences, but not in dir to check
        self.create_node("a/c", is_dir=False)
        os.mkdir(os.path.join(self.share.path, "a", "k"))

        # scan!
        lr = LocalRescan(self.vm, self.fsm, self.eq)
        toscan = os.path.join(self.share.path, "a", "b")

        def check(_):
            '''checks all is ok.'''
            self.assertEqual(len(self.eq.pushed), 2)
            events = sorted(self.eq.pushed)
            self.assertEqual(events[0], ('FS_DIR_DELETE', sh1))
            self.assertEqual(events[1], ('FS_FILE_CREATE', sh2))

        self.deferred.addCallback(lr.scan_dir)
        self.deferred.addCallback(check)
        self.deferred.callback(toscan)
        return self.deferred

00468     def test_one_nonexistant_dir(self):
        '''Specific subtree for a dir that's not in a share or not at all.'''
        lr = LocalRescan(self.vm, self.fsm, self.eq)

        # real dir, but not in share
        self.assertRaises(ValueError, lr.scan_dir, "/tmp")

        # no dir at all
        self.assertRaises(ValueError, lr.scan_dir, "no-dir-at-all")

        # inside a share, but no real dir
        # this should be ok, as it may be just deleted by the user
        nodir = os.path.join(self.share.path, "no-dir-at-all")
        lr.scan_dir(nodir)

        # inside a share, and real, but no really a dir
        nodir = self.create_node("a", is_dir=False)
        self.assertRaises(ValueError, lr.scan_dir, nodir)

00487     def test_one_dir_ro_share(self):
        '''The dir is in a share that's RO, no error but no action.'''
        # create the share
        share = self.create_share('share_id', 'ro_share2',  self.fsm,
                                       self.shares_dir, access_level='View')
        self.fsm.create(share.path, "share_id", is_dir=True)
        self.fsm.set_node_id(share.path, "uuidshare")

        lr = LocalRescan(self.vm, self.fsm, self.eq)
        lr.scan_dir(share.path)

00498     def test_content_changed(self):
        '''Test that it detects the content change.'''
        # create one type in the share, other in disk
        path = self.create_node("a", is_dir=False)
        with open(path, "w") as fh:
            fh.write("foo")

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_FILE_CLOSE_WRITE', path)])

        self.startTest(check)
        return self.deferred

00512     def test_inode_changed(self):
        '''Test that it detects a change using the filedate.'''
        # two files with same dates
        pathx = os.path.join(self.share.path, "x")
        pathy = os.path.join(self.share.path, "y")
        open(pathx, "w").close()
        open(pathy, "w").close()
        thispath = self.create_node("x", is_dir=False)

        # move the second into the first one
        os.rename(pathy, pathx)

        # a & m times will be the same, but not the inode or change time
        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_FILE_CLOSE_WRITE', pathx)])

        self.startTest(check)
        return self.deferred


00533 class InotifyTests(TwistedBase):
    '''Test LocalRescan pushing events to the EventQueue.'''
    timeout = 2

00537     def setUp(self):
        '''Setup the test.'''
        TwistedBase.setUp(self)
        self.eq = event_queue.EventQueue(self.fsm)
        self.lr = LocalRescan(self.vm, self.fsm, self.eq)
        self.real_os_stat = os.stat

00544     def tearDown(self):
        os.stat = self.real_os_stat
        self.eq.shutdown()

00548     def test_man_in_the_middle(self):
        '''Intercept normal work and change the disk.'''
        for c in "abcdefghijk":
            self.create_node(c, is_dir=False)

        # remove a couple, create some new
        sh1 = os.path.join(self.share.path, "d")
        os.remove(sh1)
        sh2 = os.path.join(self.share.path, "f")
        os.remove(sh2)
        sh3 = os.path.join(self.share.path, "jj")
        open(sh3, "w").close()
        sh4 = os.path.join(self.share.path, "kk")
        os.mkdir(sh4)

        # this sh5 will be written in the middle of the scan
        sh5 = os.path.join(self.share.path, "zz")

        should_receive_events = [
            ('FS_DIR_CREATE', sh4),
            ('FS_FILE_CREATE', sh3),
            ('FS_FILE_CREATE', sh5),
            ('FS_FILE_DELETE', sh1),
            ('FS_FILE_DELETE', sh2),
        ]

        # helper class, pylint: disable-msg=C0111
        class HitMe(object):
            # class-closure, cannot use self, pylint: disable-msg=E0213
            def __init__(innerself):
                innerself.hist = []
            def handle_default(innerself, event_name, path):
                innerself.hist.append((event_name, path))
        hm = HitMe()
        self.eq.subscribe(hm)

        # we need to intercept compare, as the stat interception needs
        # to be done after compare() starts.
        # use os.stat to get in the middle of the process, to put some
        # dirt in the testing scenario

        real_compare = self.lr._compare

        def middle_compare(*a1, **k1):
            '''Changes os.stat.'''
            self.lr._compare = real_compare

            def middle_stat(*a2, **k2):
                '''Dirt!'''
                os.stat = self.real_os_stat
                open(sh5, "w").close()
                reactor.iterate(.1)
                return self.real_os_stat(*a2, **k2)

            os.stat = middle_stat
            return real_compare(*a1, **k1)

        self.lr._compare = middle_compare

        def check(_):
            '''check'''
            self.assertEqual(sorted(hm.hist), should_receive_events)

        self.deferred.addCallback(lambda x: self.lr.start())
        self.deferred.addCallback(check)
        self.deferred.callback(None)
        return self.deferred


00617 class QueuingTests(TwistedTestCase, BaseTestCase):
    '''Test that simultaneus calls are queued.'''
    timeout = 2

00621     def setUp(self):
        '''set up the test.'''
        BaseTestCase.setUp(self)
        self.deferred = defer.Deferred()
        self.eq = event_queue.EventQueue(self.fsm)
        self.lr = LocalRescan(self.vm, self.fsm, self.eq)

        # create two shares
        self.share1 = self.create_share('share_id1', 'ro_share_1',  self.fsm,
                                       self.shares_dir, access_level='Modify')
        self.fsm.create(self.share1.path, "share_id1", is_dir=True)
        self.fsm.set_node_id(self.share1.path, "uuidshare1")
        self.share2 = self.create_share('share_id2', 'ro_share_2',  self.fsm,
                                       self.shares_dir, access_level='Modify')
        self.fsm.create(self.share2.path, "share_id2", is_dir=True)
        self.fsm.set_node_id(self.share2.path, "uuidshare2")

        self.real_os_stat = os.stat

00640     def tearDown(self):
        os.stat = self.real_os_stat
        self.eq.shutdown()

00644     def test_intercept_generate_second(self):
        '''Intercept the first work and generate a second scan.'''
        # fill and alter first share
        for c in "abcdefgh":
            self.create_node(c, is_dir=False, which_share=self.share1)
        sh1 = os.path.join(self.share1.path, "d")
        os.remove(sh1)
        sh2 = os.path.join(self.share1.path, "jj")
        open(sh2, "w").close()

        # fill and alter second share
        for c in "abcdefgh":
            self.create_node(c, is_dir=False, which_share=self.share2)
        sh3 = os.path.join(self.share2.path, "e")
        os.remove(sh3)
        sh4 = os.path.join(self.share2.path, "kk")
        open(sh4, "w").close()

        should_receive_events = [
            ('FS_FILE_CREATE', sh2),
            ('FS_FILE_CREATE', sh4),
            ('FS_FILE_DELETE', sh1),
            ('FS_FILE_DELETE', sh3),
        ]

        # helper class, pylint: disable-msg=C0111
        class HitMe(object):
            # class-closure, cannot use self, pylint: disable-msg=E0213
            def __init__(innerself):
                innerself.hist = []
            def handle_default(innerself, event_name, path):
                innerself.hist.append((event_name, path))
        hm = HitMe()
        self.eq.subscribe(hm)

        # we need to intercept compare, as the stat interception needs
        # to be done after compare() starts.
        # use os.stat to get in the middle of the process, to put some
        # dirt in the testing scenario

        real_compare = self.lr._compare

        def middle_compare(*a1, **k1):
            '''Changes os.stat.'''
            self.lr._compare = real_compare

            def middle_stat(*a2, **k2):
                '''Dirt!'''
                os.stat = self.real_os_stat
                self.lr.scan_dir(self.share2.path)
                return self.real_os_stat(*a2, **k2)

            os.stat = middle_stat
            return real_compare(*a1, **k1)

        self.lr._compare = middle_compare

        def check(_):
            '''check'''
            self.assertEqual(sorted(hm.hist), should_receive_events)

        self.deferred.addCallback(self.lr.scan_dir)
        self.deferred.addCallback(check)
        self.deferred.callback(self.share1.path)
        return self.deferred


00711 class PushTests(TwistedBase):
    '''Test LocalRescan pushing events to the EventQueue.'''
    timeout = 2

00715     def setUp(self):
        '''Setup the test.'''
        TwistedBase.setUp(self)
        self.eq = event_queue.EventQueue(self.fsm)
        self.lr = LocalRescan(self.vm, self.fsm, self.eq)

00721     def tearDown(self):
        '''Cleanup the test.'''
        self.eq.shutdown()
        BaseTestCase.tearDown(self)

00726     def test_one_dir_create(self):
        '''Check that an example dir create is really pushed.'''
        # helper class, pylint: disable-msg=C0111
        result = []
        class Listener(object):
            def handle_FS_DIR_CREATE(self, path):
                result.append(path)

        l = Listener()
        self.eq.subscribe(l)

        def cleanup():
            """ unsubscribe the listeners """
            self.eq.unsubscribe(l)
        self.addCleanup(cleanup)

        # push some event
        filepath = os.path.join(self.share.path, "a")
        os.mkdir(filepath)

        def check(_):
            """ asserts that GoodListener was called. """
            self.assertEquals(1, len(result))
            self.assertEquals(filepath, result[0])

        self.startTest(check)
        return self.deferred

00754     def test_one_file_create(self):
        '''Check that an example file create is really pushed.'''
        # helper class, pylint: disable-msg=C0111
        result = []
        class Listener(object):
            def handle_FS_FILE_CREATE(self, path):
                result.append(path)

        l = Listener()
        self.eq.subscribe(l)

        def cleanup():
            """ unsubscribe the listeners """
            self.eq.unsubscribe(l)
        self.addCleanup(cleanup)

        # push some event
        filepath = os.path.join(self.share.path, "a")
        open(filepath, "w").close()

        def check(_):
            """ asserts that GoodListener was called. """
            self.assertEquals(1, len(result))
            self.assertEquals(filepath, result[0])

        self.startTest(check)
        return self.deferred

00782     def test_one_dir_delete(self):
        '''Check that an example dir delete is really pushed.'''

        # helper class, pylint: disable-msg=C0111
        result = []
        class Listener(object):
            def handle_FS_DIR_DELETE(self, path):
                result.append(path)

        l = Listener()
        self.eq.subscribe(l)

        def cleanup():
            """ unsubscribe the listeners """
            self.eq.unsubscribe(l)
        self.addCleanup(cleanup)

        # push some event
        filepath = os.path.join(self.share.path, "a")
        self.fsm.create(filepath, self.share.id, is_dir=True)
        self.fsm.set_node_id(filepath, "uuid1")

        def check(_):
            """ asserts that GoodListener was called. """
            self.assertEquals(1, len(result))
            self.assertEquals(filepath, result[0])

        self.startTest(check)
        return self.deferred

00812     def test_one_file_delete(self):
        '''Check that an example file delete is really pushed.'''
        # helper class, pylint: disable-msg=C0111
        result = []
        class Listener(object):
            def handle_FS_FILE_DELETE(self, path):
                result.append(path)

        l = Listener()
        self.eq.subscribe(l)

        def cleanup():
            """ unsubscribe the listeners """
            self.eq.unsubscribe(l)
        self.addCleanup(cleanup)

        # push some event
        filepath = os.path.join(self.share.path, "a")
        self.fsm.create(filepath, self.share.id, is_dir=False)
        self.fsm.set_node_id(filepath, "uuid1")

        def check(_):
            """ asserts that GoodListener was called. """
            self.assertEquals(1, len(result))
            self.assertEquals(filepath, result[0])

        self.startTest(check)
        return self.deferred

00841     def test_file_changed(self):
        '''Check that an example close write is pushed.'''
        # helper class, pylint: disable-msg=C0111
        result = []
        class Listener(object):
            def handle_FS_FILE_CLOSE_WRITE(self, path):
                result.append(path)

        l = Listener()
        self.eq.subscribe(l)

        def cleanup():
            """ unsubscribe the listeners """
            self.eq.unsubscribe(l)
        self.addCleanup(cleanup)

        # push some event
        filepath = os.path.join(self.share.path, "a")
        self.fsm.create(filepath, self.share.id, is_dir=False)
        self.fsm.set_node_id(filepath, "uuid1")
        with open(filepath, "w") as fh:
            fh.write("foo")

        def check(_):
            """ asserts that GoodListener was called. """
            self.assertEquals(1, len(result))
            self.assertEquals(filepath, result[0])

        self.startTest(check)
        return self.deferred

00872     def test_file_changed_in_nestedstruct(self):
        '''Check that an example close write is pushed.'''
        # helper class, pylint: disable-msg=C0111
        result = []
        class Listener(object):
            def handle_FS_FILE_CLOSE_WRITE(self, path):
                result.append(path)

        l = Listener()
        self.eq.subscribe(l)

        def cleanup():
            """ unsubscribe the listeners """
            self.eq.unsubscribe(l)
        self.addCleanup(cleanup)

        # create nested struct
        path = os.path.join(self.share.path, "a")
        self.fsm.create(path, self.share.id, is_dir=True)
        self.fsm.set_node_id(path, "uuid1")
        os.mkdir(path)
        path = os.path.join(self.share.path, "a", "b")
        self.fsm.create(path, self.share.id, is_dir=True)
        self.fsm.set_node_id(path, "uuid2")
        os.mkdir(path)
        path = os.path.join(self.share.path, "a", "b", "c")
        self.fsm.create(path, self.share.id, is_dir=False)
        self.fsm.set_node_id(path, "uuid3")
        open(path, "w").close()

        # push some event
        with open(path, "w") as fh:
            fh.write("foo")

        def check(_):
            """ asserts that GoodListener was called. """
            self.assertEquals(1, len(result))
            self.assertEquals(path, result[0])

        self.startTest(check)
        return self.deferred

00914     def test_conflict_file(self):
        '''Found a .conflict file.'''
        # helper class, pylint: disable-msg=C0111
        class Listener(object):
            def __init__(self):
                self.hit = False
            def handle_default(self, *args):
                self.hit = True

        listener = Listener()
        self.eq.subscribe(listener)

        def cleanup():
            """ unsubscribe the listeners """
            self.eq.unsubscribe(listener)
        self.addCleanup(cleanup)

        # push some event
        path = os.path.join(self.share.path, "foobar.conflict")
        open(path, "w").close()

        def check(_):
            '''checks no messages and file still there'''
            self.assertFalse(listener.hit)
            self.assertTrue(os.path.exists(path))

        self.startTest(check)
        return self.deferred

00943     def test_conflict_dir(self):
        '''Found a .conflict dir.'''
        # helper class, pylint: disable-msg=C0111
        class Listener(object):
            def __init__(self):
                self.hit = False
            def handle_default(self, *args):
                self.hit = True

        listener = Listener()
        self.eq.subscribe(listener)

        def cleanup():
            """ unsubscribe the listeners """
            self.eq.unsubscribe(listener)
        self.addCleanup(cleanup)

        # push some event
        path = os.path.join(self.share.path, "foobar.conflict")
        os.mkdir(path)

        def check(_):
            '''checks no messages and file still there'''
            self.assertFalse(listener.hit)
            self.assertTrue(os.path.exists(path))

        self.startTest(check)
        return self.deferred



00974 class BadStateTests(TwistedBase):
    '''Test what happens with those files left in a bad state last time.'''

00977     def _hash(self, path):
        '''Hashes a file.'''
        hasher = content_hash_factory()
        with open(path) as fh:
            while True:
                cont = fh.read(65536)
                if not cont:
                    break
                hasher.update(cont)
        return hasher.content_hash()

00988     def test_no_uuid_empty(self):
        '''Found an empty file that does not have uuid yet.'''
        path = os.path.join(self.share.path, "a")
        self.fsm.create(path, self.share.id, is_dir=False)
        open(path, "w").close()

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_FILE_CREATE', path)])
            self.assertEqual(self.fsm.has_metadata(path=path), False)

        self.startTest(check)
        return self.deferred

01002     def test_no_uuid_all_family(self):
        '''Fast creation paths: all without uuid.'''
        path1 = os.path.join(self.share.path, "a")
        self.fsm.create(path1, self.share.id, is_dir=True)
        os.mkdir(path1)
        path2 = os.path.join(self.share.path, "a", "b")
        self.fsm.create(path2, self.share.id, is_dir=False)
        open(path2, "w").close()

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_FILE_CREATE', path2)])
            self.assertEqual(self.fsm.has_metadata(path=path1), True)
            self.assertEqual(self.fsm.has_metadata(path=path2), False)

        self.deferred.addCallback(self.lr.scan_dir)
        self.deferred.addCallback(check)
        self.deferred.callback(path1)
        return self.deferred

01022     def test_no_uuid_content(self):
        '''Found a non empty file that does not have uuid yet.'''
        path = os.path.join(self.share.path, "a")
        self.fsm.create(path, self.share.id, is_dir=False)
        with open(path, "w") as fh:
            fh.write("foo")

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_FILE_CREATE', path),
                                              ('FS_FILE_CLOSE_WRITE', path)])
            self.assertEqual(self.fsm.has_metadata(path=path), False)

        self.startTest(check)
        return self.deferred

01038     def test_LOCAL(self):
        '''We were uploading the file, but it was interrupted.'''
        # create the file in metadata and real
        path = os.path.join(self.share.path, "a")
        mdid = self.fsm.create(path, self.share.id, is_dir=False)
        self.fsm.set_node_id(path, "uuid")
        with open(path, "w") as fh:
            fh.write("foo")

        # start a put file, and assume we got interrupted (no server hash)
        pathhash = self._hash(path)
        self.fsm.set_by_mdid(mdid, local_hash=pathhash)
        self.fsm.refresh_stat(path)

        def check(_):
            '''check'''
            self.assertEqual(self.eq.pushed, [('FS_FILE_CLOSE_WRITE', path)])

        self.startTest(check)
        return self.deferred

01059     def test_SERVER_file_empty(self):
        '''We were downloading the file, but it was interrupted.'''
        # create the file in metadata
        path = os.path.join(self.share.path, "a")
        mdid = self.fsm.create(path, self.share.id, is_dir=False)
        self.fsm.set_node_id(path, "uuid")

        # start the download, never complet it
        self.fsm.set_by_mdid(mdid, server_hash="blah-hash-blah")
        self.fsm.create_partial("uuid", self.share.id)
        fh = self.fsm.get_partial_for_writing("uuid", self.share.id)
        fh.write("foobar")
        self.assertTrue(os.path.exists(path + ".partial"))

        def check(_):
            '''arrange the metadata so later server_rescan will do ok'''
            mdobj = self.fsm.get_by_mdid(mdid)
            self.assertFalse(mdobj.info.is_partial)
            self.assertEqual(mdobj.server_hash, mdobj.local_hash)
            self.assertFalse(os.path.exists(path + ".partial"))

        self.startTest(check)
        return self.deferred

01083     def test_SERVER_file_content(self):
        '''We were downloading the file, but it was interrupted, and changed'''
        # create the file in metadata
        path = os.path.join(self.share.path, "a")
        mdid = self.fsm.create(path, self.share.id, is_dir=False)
        self.fsm.set_node_id(path, "uuid")

        # start the download, never complet it
        self.fsm.set_by_mdid(mdid, server_hash="blah-hash-blah")
        self.fsm.create_partial("uuid", self.share.id)
        fh = self.fsm.get_partial_for_writing("uuid", self.share.id)
        fh.write("foobar")
        self.assertTrue(os.path.exists(path + ".partial"))

        # also change the original file
        with open(path, "w") as fh:
            fh.write("I see dead people")

        def check(_):
            '''arrange the metadata so later server_rescan will do ok'''
            mdobj = self.fsm.get_by_mdid(mdid)
            self.assertFalse(mdobj.info.is_partial)
            self.assertEqual(mdobj.server_hash, mdobj.local_hash)
            self.assertFalse(os.path.exists(path + ".partial"))
            self.assertEqual(self.eq.pushed, [('FS_FILE_CLOSE_WRITE', path)])

        self.startTest(check)
        return self.deferred

01112     def test_SERVER_dir(self):
        '''We were downloading the dir, but it was interrupted.'''
        # create the file in metadata
        path = os.path.join(self.share.path, "a")
        mdid = self.fsm.create(path, self.share.id, is_dir=True)
        self.fsm.set_node_id(path, "uuid")

        # start the download, never complet it
        self.fsm.set_by_mdid(mdid, server_hash="blah-hash-blah")
        self.fsm.create_partial("uuid", self.share.id)
        fh = self.fsm.get_partial_for_writing("uuid", self.share.id)
        fh.write("foobar")
        self.assertTrue(os.path.exists(os.path.join(path, ".partial")))

        # also put a file inside the directory, to check that LR enters in it
        filepath = os.path.join(path, "file")
        open(filepath, "w").close()

        def check(_):
            '''arrange the metadata so later server_rescan will do ok'''
            mdobj = self.fsm.get_by_mdid(mdid)
            self.assertFalse(mdobj.info.is_partial)
            self.assertEqual(mdobj.server_hash, mdobj.local_hash)
            self.assertFalse(os.path.exists(path + ".partial"))
            self.assertEqual(self.eq.pushed, [('FS_FILE_CREATE', filepath)])

        self.startTest(check)
        return self.deferred

01141     def test_partial_nomd(self):
        '''Found a .partial with no metadata at all.'''
        path = os.path.join(self.share.path, "a.partial")
        open(path, "w").close()

        def check(_):
            '''checks everything's ok'''
            self.assertEqual(self.eq.pushed, [])
            self.assertFalse(os.path.exists(path))

        self.startTest(check)
        return self.deferred

01154     def test_directory_bad_changed(self):
        '''Found a dir with 'changed' not SERVER nor NONE.'''
        path1 = os.path.join(self.share.path, "onedir")
        self.fsm.create(path1, self.share.id, is_dir=True)
        self.fsm.set_node_id(path1, "uuid1")
        os.mkdir(path1)
        self.fsm.set_by_path(path1, server_hash="foo")

        # create a child to it
        path2 = os.path.join(self.share.path, "onedir", "file_inside")
        self.fsm.create(path2, self.share.id)
        self.fsm.set_node_id(path2, "uuid2")
        open(path2, "w").close()

        def check(_):
            '''move to conflict, remove MD'''
            self.assertFalse(os.path.exists(path1))
            self.assertTrue(os.path.exists(path1 + ".conflict"))
            self.assertFalse(self.fsm.has_metadata(path=path1))

            # check MD is gone also for children
            self.assertFalse(self.fsm.has_metadata(path=path2))

        self.startTest(check)
        return self.deferred

01180     def test_file_partial_dir(self):
        '''Found a .partial of a file, but MD says it's a dir.'''
        # create the dir in metadata
        path = os.path.join(self.share.path, "a")
        mdid = self.fsm.create(path, self.share.id, is_dir=True)
        self.fsm.set_node_id(path, "uuid")

        # start the download, never complet it
        self.fsm.set_by_mdid(mdid, server_hash="blah-hash-blah")
        self.fsm.create_partial("uuid", self.share.id)
        fh = self.fsm.get_partial_for_writing("uuid", self.share.id)
        fh.write("foobar")
        fh.close()
        self.assertTrue(os.path.exists(os.path.join(path, ".partial")))

        # now change the dir for a file, for LR to find it
        os.remove(os.path.join(path, ".partial"))
        os.rmdir(path)
        partial_path = path + ".partial"
        open(partial_path, "w").close()

        def check(_):
            '''remove MD'''
            self.assertFalse(os.path.exists(path))
            self.assertFalse(os.path.exists(partial_path))
            self.assertFalse(self.fsm.has_metadata(path=path))

        self.startTest(check)
        return self.deferred

01210     def test_file_noSERVER(self):
        '''We were downloading the file,it was interrupted, but no SERVER.'''
        # create the file in metadata
        path = os.path.join(self.share.path, "a")
        mdid = self.fsm.create(path, self.share.id, is_dir=False)
        self.fsm.set_node_id(path, "uuid")

        # start the download, never complet it
        self.fsm.set_by_mdid(mdid, server_hash="blah-hash-blah")
        self.fsm.create_partial("uuid", self.share.id)
        fh = self.fsm.get_partial_for_writing("uuid", self.share.id)
        fh.write("foobar")
        partial_path = path + ".partial"
        self.assertTrue(os.path.exists(partial_path))

        # trick it to not return "SERVER"
        real_mdobj = self.fsm.fs[mdid]
        real_mdobj["info"]["is_partial"] = False
        self.fsm.fs[mdid] = real_mdobj

        def check(_):
            '''no conflict file as the file never existed, remove MD'''
            self.assertFalse(os.path.exists(partial_path))
            self.assertFalse(self.fsm.has_metadata(path=path))

        self.startTest(check)
        return self.deferred

01238     def test_no_file_broken_metadata(self):
        '''Found broken metadata but no file.'''
        path = os.path.join(self.share.path, "a")
        mdid = self.fsm.create(path, self.share.id, is_dir=False)
        self.fsm.set_node_id(path, "uuid")

        # break the metadata
        real_mdobj = self.fsm.fs[mdid]
        real_mdobj["info"]["is_partial"] = True
        self.fsm.fs[mdid] = real_mdobj

        def check(_):
            '''checks everything's ok'''
            self.assertEqual(self.eq.pushed, [])
            self.assertFalse(self.fsm.has_metadata(path=path))

        self.startTest(check)
        return self.deferred

01257     def test_no_dir_broken_metadata_deep(self):
        '''Found broken metadata but no dir.'''
        path1 = os.path.join(self.share.path, "a")
        mdid1 = self.fsm.create(path1, self.share.id, is_dir=True)
        self.fsm.set_node_id(path1, "uuid1")

        path2 = os.path.join(self.share.path, "a", "b")
        mdid2 = self.fsm.create(path2, self.share.id, is_dir=False)
        self.fsm.set_node_id(path2, "uuid2")

        # break both metadatas
        real_mdobj = self.fsm.fs[mdid1]
        real_mdobj["info"]["is_partial"] = True
        self.fsm.fs[mdid1] = real_mdobj

        real_mdobj = self.fsm.fs[mdid2]
        real_mdobj["info"]["is_partial"] = True
        self.fsm.fs[mdid2] = real_mdobj

        def check(_):
            '''checks everything's ok'''
            self.assertEqual(self.eq.pushed, [])
            self.assertFalse(self.fsm.has_metadata(path=path1))
            self.assertFalse(self.fsm.has_metadata(path=path2))

        self.startTest(check)
        return self.deferred

01285     def test_no_dir_broken_metadata(self):
        '''Found broken metadata but no dir.'''
        path = os.path.join(self.share.path, "a")
        mdid = self.fsm.create(path, self.share.id, is_dir=True)
        self.fsm.set_node_id(path, "uuid")

        # break the metadata
        real_mdobj = self.fsm.fs[mdid]
        real_mdobj["info"]["is_partial"] = True
        self.fsm.fs[mdid] = real_mdobj

        def check(_):
            '''checks everything's ok'''
            self.assertEqual(self.eq.pushed, [])
            self.assertFalse(self.fsm.has_metadata(path=path))

        self.startTest(check)
        return self.deferred

01304     def test_no_dir_LOCAL_metadata(self):
        '''Found metadata with 'changed' in LOCAL but no dir.'''
        path = os.path.join(self.share.path, "a")
        mdid = self.fsm.create(path, self.share.id, is_dir=True)
        self.fsm.set_node_id(path, "uuid")

        # break the metadata
        real_mdobj = self.fsm.fs[mdid]
        real_mdobj["info"]["is_partial"] = False
        real_mdobj["server_hash"] = "different-than-local"
        self.fsm.fs[mdid] = real_mdobj

        def check(_):
            '''checks everything's ok'''
            self.assertEqual(self.eq.pushed, [])
            self.assertFalse(self.fsm.has_metadata(path=path))

        self.startTest(check)
        return self.deferred


def test_suite():
    # pylint: disable-msg=C0111
    return unittest.TestLoader().loadTestsFromName(__name__)

if __name__ == "__main__":
    unittest.main()

Generated by  Doxygen 1.6.0   Back to index