Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

main.py

# ubuntuone.syncdaemon.main - main SyncDaemon innards
#
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
#         John Lenton <john.lenton@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
""" SyncDaemon Main"""
import dbus
import dbus.mainloop.glib
import gnomekeyring
import logging
import os
import sys

from twisted.internet import defer, reactor, task

from ubuntuone.storageprotocol import oauth
from ubuntuone.syncdaemon import (
    action_queue,
    dbus_interface,
    event_queue,
    filesystem_manager,
    hash_queue,
    local_rescan,
    states,
    sync,
    volume_manager,
)
from ubuntuone.syncdaemon.state import SyncDaemonStateManager


00044 class Main(object):
    """ The one who executes the syncdaemon """

00047     def __init__(self, root_dir, shares_dir, data_dir,
                 host='fs-1.ubuntuone.com', port=443, dns_srv=None, ssl=True,
                 disable_ssl_verify=False,
                 realm='https://ubuntuone.com', glib_loop=False,
                 mark_interval=120):
        """ create the instance. """
        self.root_dir = root_dir
        self.shares_dir = shares_dir
        self.data_dir = data_dir
        self.logger = logging.getLogger('ubuntuone.SyncDaemon.Main')
        self.host = host
        self.port = port
        self.dns_srv = dns_srv
        self.ssl = ssl
        self.disable_ssl_verify = disable_ssl_verify
        self.realm = realm
        self.token = None

        self.vm = volume_manager.VolumeManager(self)
        self.fs = filesystem_manager.FileSystemManager(data_dir, self.vm)
        self.event_q = event_queue.EventQueue(self.fs)
        self.oauth_client = OAuthClient(self.realm)
        self.state = SyncDaemonStateManager(self)
        # subscribe VM to EQ
        self.event_q.subscribe(self.vm)
        self.vm.init_root()
        # we don't have the oauth tokens yet, we 'll get them later
        self.action_q = action_queue.ActionQueue(self.event_q, host, port,
                                                 self.dns_srv, ssl,
                                                 disable_ssl_verify)
        self.hash_q = hash_queue.HashQueue(self.event_q)

        self.sync = sync.Sync(self)
        self.lr = local_rescan.LocalRescan(self.vm, self.fs, self.event_q)

        if not glib_loop:
            self.bus = dbus.SessionBus()
        else:
            loop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
            self.bus = dbus.SessionBus(loop)
        self.dbus_iface = dbus_interface.DBusInterface(self.bus, self)
        self.logger.info("Using %s as root dir", self.root_dir)
        self.logger.info("Using %s as data dir", self.data_dir)
        self.logger.info("Using %s as shares root dir", self.shares_dir)
        self.mark = task.LoopingCall(self.log_mark)
        self.mark.start(mark_interval)

00094     def log_mark(self):
        """ log a "mark" that includes the current AQ state and queue size"""
        self.logger.info("%s %s (state: %s; queues: metadata: %d; content: %d;"
                         " hash: %d) %s" % ('-'*4, 'MARK', self.state.name,
                                         len(self.action_q.meta_queue),
                                         len(self.action_q.content_queue),
                                         len(self.hash_q), '-'*4))

00102     def wait_for_nirvana(self, last_event_interval=0.5):
        """Get a deferred that will fire when there are no more
        events or transfers."""
        self.logger.debug('wait_for_nirvana(%s)' % last_event_interval)
        d = defer.Deferred()
        def start():
            """request the event empty notification"""
            self.logger.debug('starting wait_for_nirvana')
            self.event_q.add_empty_event_queue_callback(callback)
        def callback():
            """event queue is empty"""
            if not (self.state == states.IDLE
                    and self.hash_q.empty()):
                self.logger.debug("I can't attain Nirvana yet."
                                  " [state: %s; queues:"
                                  " metadata: %d; content: %d; hash: %d]"
                                  % (self.state.name,
                                     len(self.action_q.meta_queue),
                                     len(self.action_q.content_queue),
                                     len(self.hash_q)))
                return
            self.logger.debug("Nirvana reached!! I'm a Buddha")
            self.event_q.remove_empty_event_queue_callback(callback)
            d.callback(True)
        reactor.callLater(last_event_interval, start)
        return d

00129     def start(self):
        """setup the daemon to be ready to run"""
        # hook the event queue to the root dir
        self.event_q.push('SYS_WAIT_FOR_LOCAL_RESCAN')
        self.event_q.inotify_add_watch(self.root_dir)

        # do the local rescan
        self.logger.info("Local rescan starting...")
        d = self.lr.start()

        def _wait_for_hashq():
            """
            Keep on calling this until the hash_q finishes.
            """
            if len(self.hash_q):
                self.logger.info("hash queue pending. Waiting for it...")
                reactor.callLater(.1, _wait_for_hashq)
            else:
                self.logger.info("hash queue empty. We are ready!")
                # nudge the action queue into action
                self.event_q.push('SYS_LOCAL_RESCAN_DONE')

        def local_rescan_done(_):
            '''After local rescan finished.'''
            self.logger.info("Local rescan finished!")
            _wait_for_hashq()

        def stop_the_press(failure):
            '''Something went wrong in LR, can't continue.'''
            self.logger.error("Local rescan finished with error: %s",
                                                failure.getBriefTraceback())
            self.event_q.push('SYS_UNKNOWN_ERROR')

        d.addCallbacks(local_rescan_done, stop_the_press)
        return d

00165     def shutdown(self):
        """ shutdown the daemon """
        self.event_q.push('SYS_DISCONNECT')
        self.event_q.shutdown()
        self.dbus_iface.shutdown()
        self.mark.stop()

00172     def get_root(self, root_mdid):
        """
        Ask que AQ for our root's uuid
        """
        def _worker():
            """
            Actually do the asking
            """
            d = self.action_q.get_root(root_mdid)
            def root_node_cb(root):
                """ root node fetched callback. """
                root_mdid = self.vm.on_server_root(root)
                self.action_q.uuid_map.set(root_mdid, root)
            d.addCallback(root_node_cb)
            return d
        if self.action_q.client is None:
            # aq not yet connected
            self.action_q.deferred.addCallback(_worker)
            return self.action_q.deferred
        else:
            return _worker()

00194     def check_version(self):
        """
        Check the client protocol version matches that of the server.
        """
        d = self.action_q.client.protocol_version()
        def protocol_callback(_):
            """protocol check was OK"""
            self.logger.info("Protocol version OK")
            self.event_q.push('SYS_PROTOCOL_VERSION_OK')
        def protocol_errback(failure):
            """protocol check was *not* OK"""
            self.logger.error("Protocol version error")
            self.logger.debug('traceback follows:\n\n' + failure.getTraceback())
            if failure.value.message == 'UNSUPPORTED_VERSION':
                self.event_q.push('SYS_PROTOCOL_VERSION_ERROR',
                                  error=failure.getErrorMessage())
            else:
                self.event_q.push('SYS_UNKNOWN_ERROR')
            # it looks like we won't be authenticating, so hook up the
            # for-testing action_queue deferred now
            d.chainDeferred(self.action_q.deferred)
            return failure
        d.addCallbacks(protocol_callback, protocol_errback)

00218     def authenticate(self):
        """
        Do the OAuth dance.
        """
        d = self.action_q.client.oauth_authenticate(self.oauth_client.consumer,
                                                    self.action_q.token)
        def oauth_errback(failure):
            "OAuth failed"
            self.logger.error("OAuth failed: %s", failure)
            self.event_q.push('SYS_OAUTH_ERROR', error=failure)
            return failure
        def oauth_callback(a):
            "OAuth succeeded"
            self.logger.info("Oauth OK")
            self.event_q.push('SYS_OAUTH_OK')
            return self.action_q.client
        d.addCallbacks(oauth_callback, oauth_errback)
        d.chainDeferred(self.action_q.deferred)

00237     def server_rescan(self):
        """
        Do the server rescan
        """
        d = self.get_root(object())
        d.addCallback(lambda _: self.event_q.push('SYS_SERVER_RESCAN_STARTING'))
        d.addCallback(lambda _: self.action_q.client.query(
                self.fs.get_data_for_server_rescan()))
        d.addCallback(lambda _: self.event_q.push('SYS_SERVER_RESCAN_DONE'))
        return d

00248     def set_oauth_token(self, key, secret):
        """ Set's the oauth token """
        self.token = oauth.OAuthToken(key, secret)

00252     def get_access_token(self):
        """Return the access token or a new one"""
        if self.token:
            return self.token
        else:
            return self.oauth_client.get_access_token()

00259     def get_rootdir(self):
        """ Returns the base dir/mount point"""
        return os.path.dirname(self.root_dir)

00263     def quit(self, exit_value=0):
        """ shutdown and stop the reactor. """
        self.shutdown()
        if reactor.running:
            reactor.stop()
        else:
            sys.exit(exit_value)


00272 class NoAccessToken(Exception):
    """No access token available."""


00276 class OAuthClient(object):
    """ Basic OAuth client, just grab the token from the keyring. """

00279     def __init__(self, realm, consumer_key='ubuntuone',
                 consumer_secret='hammertime'):
        """ create the instance and setup the modules """
        self.realm = realm
        self.consumer = oauth.OAuthConsumer(consumer_key, consumer_secret)

00285     def get_access_token(self):
        """Get the access token from the keyring.

        If no token is available in the keyring, `NoAccessToken` is raised.
        """
        try:
            items = gnomekeyring.find_items_sync(
                gnomekeyring.ITEM_GENERIC_SECRET,
                {'ubuntuone-realm': self.realm,
                 'oauth-consumer-key': self.consumer.key})
            return oauth.OAuthToken.from_string(items[0].secret)
        except gnomekeyring.NoMatchError:
            raise NoAccessToken("No access token found.")
        except (gnomekeyring.NoKeyringDaemonError,
                gnomekeyring.DeniedError), e:
            raise NoAccessToken("Error while getting the token: %s" % type(e))



Generated by  Doxygen 1.6.0   Back to index