Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

action_queue.py

# ubuntuone.syncdaemon.action_queue - Action queue
#
# Author: John Lenton <john.lenton@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
The ActionQueue is where actions to be performed on the server are
queued up and then executed. The idea is that there are two queues,
one for metadata and another for content; the metadata queue has
priority over the content queue.
"""
from collections import deque, defaultdict
from functools import wraps, partial
import logging
import os
import random
import tempfile
import zlib

from zope.interface import implements
from twisted.internet import reactor, defer, ssl
from twisted.names import client as dns_client
from twisted.python.failure import Failure

import uuid
from ubuntuone.storageprotocol.context import get_ssl_context
from ubuntuone.storageprotocol import protocol_pb2
from ubuntuone.storageprotocol.client import StorageClient, \
    StorageClientFactory
from ubuntuone.syncdaemon.logger import mklog, TRACE
from ubuntuone.syncdaemon.interfaces import IActionQueue, \
    IMarker

logger = logging.getLogger("ubuntuone.SyncDaemon.ActionQueue")

# I want something which repr() is "---" *without* the quotes :)
UNKNOWN = type('', (), {'__repr__': lambda _: '---'})()

def passit(func):
    """
    Pass the value on for the next deferred, while calling func with
    it.
    """
    @wraps(func)
    def wrapper(a):
        """
        Do it.
        """
        func(a)
        return a
    return wrapper


00066 class UploadCompressionCancelled(Exception):
    """Compression of a file for upload cancelled."""


00070 class RequestCleanedUp(Exception):
    """
    The request was cancelled by ActionQueue.cleanup()
    """


00076 class NamedTemporaryFile(object):
    """
    Like tempfile.NamedTemporaryFile, but working in 2.5 also WRT the
    delete argument. Actually, one of these NamedTemporaryFile()s is
    the same as a tempfile.NamedTemporaryFile(delete=False) from 2.6.

    Or so the theory goes.
    """
    def __init__(self):
        fileno, self.name = tempfile.mkstemp()
        self._fd = os.fdopen(fileno, 'r+w')

00088     def __getattr__(self, attr):
        """proxy everything else (other than .name) on to self._fd"""
        return getattr(self._fd, attr)


00093 class MultiProxy(list):
    """
    Proxy many objects of the same kind, like this:

    >>> m = MultiProxy(['foo', 'bar', 'baz'])
    >>> m.upper()
    MultiProxy(['FOO', 'BAR', 'BAZ'])
    """

    def __getattr__(self, attr):
        return MultiProxy(getattr(i, attr) for i in self)

    def __call__(self, *args, **kwargs):
        return MultiProxy(i(*args, **kwargs) for i in self)

    def __repr__(self):
        return 'MultiProxy(%s)' % (super(MultiProxy, self).__repr__(),)


00112 class LoggingStorageClient(StorageClient):
    """ A subclass of StorageClient that adds logging to
    processMessage and sendMessage.
    """

00117     def __init__(self):
        """ setup logging and create the instance. """
        StorageClient.__init__(self)
        self.log = logging.getLogger('ubuntuone.SyncDaemon.StorageClient')
        # configure the handler level to be < than DEBUG
        self.log.setLevel(TRACE)
        self.log.debug = partial(self.log.log, TRACE)

00125     def processMessage(self, message):
        """ wrapper that logs the message and result. """
        # don't log the full message if it's of type BYTES
        if message.type == protocol_pb2.Message.BYTES:
            self.log.debug('start - processMessage: id: %s, type: %s',
                           message.id, message.type)
        else:
            self.log.debug('start - processMessage: %s',
                          str(message).replace("\n", " "))
        if message.id in self.requests:
            req = self.requests[message.id]
            req.deferred.addCallbacks(self.log_success, self.log_error)
        result = StorageClient.processMessage(self, message)
        self.log.debug('end - processMessage: id: %s - result: %s',
                       message.id, result)
        return result

00142     def log_error(self, failure):
        """ logging errback for requests """
        self.log.debug('request error: %s', failure)
        return failure

00147     def log_success(self, result):
        """ logging callback for requests """
        self.log.debug('request finished: %s', result)
        if getattr(result, '__dict__', None):
            self.log.debug('result.__dict__: %s', result.__dict__)
        return result

00154     def sendMessage(self, message):
        """ wrapper that logs the message and result. """
        # don't log the full message if it's of type BYTES
        if message.type == protocol_pb2.Message.BYTES:
            self.log.debug('start - sendMessage: id: %s, type: %s',
                           message.id, message.type)
        else:
            self.log.debug('start - sendMessage: %s',
                          str(message).replace("\n", " "))
        result = StorageClient.sendMessage(self, message)
        self.log.debug('end - sendMessage: id: %s', message.id)
        return result


00168 class ActionQueueProtocol(LoggingStorageClient):
    """
    This is the Action Queue version of the StorageClient protocol.
    """
    connection_state = 'disconnected'
    factory = None

00175     def connectionMade(self):
        """
        Called when a new connection is made.
        All the state is saved in the factory.
        """
        logger.debug("connection made")
        self.connection_state = 'connected'
        self.factory.client = self
        self.set_node_state_callback(self.factory._node_state_callback)
        self.set_share_change_callback(self.factory._share_change_callback)
        self.set_share_answer_callback(self.factory._share_answer_callback)
        self.factory.event_queue.push('SYS_CONNECTION_MADE')

00188     def disconnect(self):
        """
        Close down the sockets
        """
        logger.debug("disconnected")
        if self.transport is not None:
            self.transport.loseConnection()

00196     def connectionLost(self, failure):
        """
        The connection went down, for some reason (which might or
        might not be described in failure).
        """
        logger.warning('connection lost: %s' % failure.getErrorMessage())
        self.factory.event_queue.push('SYS_CONNECTION_LOST')
        LoggingStorageClient.connectionLost(self, failure)


00206 class Marker(str):
    """
    A uuid4-based marker class
    """
    implements(IMarker)
    def __new__(self):
        return super(Marker, self).__new__(self, uuid.uuid4())


00215 class ZipQueue(object):
    """
    A queue of files to be compressed for upload

    Parts of this were shamelessly copied from
    twisted.internet.defer.DeferredSemaphore

    see bug #373984
    """
    def __init__(self):
        self.waiting = deque()
        self.tokens = self.limit = 10

00228     def acquire(self):
        """
        return a deferred which fires on token acquisition.
        """
        assert self.tokens >= 0, "Tokens should never be negative"
        d = defer.Deferred()
        if not self.tokens:
            self.waiting.append(d)
        else:
            self.tokens = self.tokens - 1
            d.callback(self)
        return d

00241     def release(self):
        """
        Release the token.

        Should be called by whoever did the acquire() when the shared
        resource is free.
        """
        assert self.tokens < self.limit, "Too many tokens!"
        self.tokens = self.tokens + 1
        if self.waiting:
            # someone is waiting to acquire token
            self.tokens = self.tokens - 1
            d = self.waiting.popleft()
            d.callback(self)

00256     def _compress(self, deferred, upload):
        """Compression background task."""
        try:
            fileobj = upload.fileobj_factory()
        except StandardError:
            # presumably the user deleted the file before we got to
            # upload it. Logging a warning just in case.
            upload.log.warn('unable to build fileobj'
                            ' (user deleted the file, maybe?)'
                            ' so cancelling the upload.')
            upload.cancel()
            fileobj = None

        filename = getattr(fileobj, 'name', '<?>')

        upload.log.debug('compressing', filename)
        try:
            # we need to compress the file completely to figure out its
            # compressed size. So streaming is out :(
            if upload.tempfile_factory is None:
                f = NamedTemporaryFile()
            else:
                f = upload.tempfile_factory()
            zipper = zlib.compressobj()
            while not upload.cancelled:
                data = fileobj.read(4096)
                if not data:
                    f.write(zipper.flush())
                    # no flush/sync because we don't need this to persist
                    # on disk; if the machine goes down, we'll lose it
                    # anyway (being in /tmp and all)
                    break
                f.write(zipper.compress(data))
            if upload.cancelled:
                raise UploadCompressionCancelled("Cancelled")
            upload.deflated_size = f.tell()
            # close the compressed file (thus, if you actually want to stream
            # it out, it must have a name so it can be reopnened)
            f.close()
            upload.tempfile = f
        except Exception, e: # pylint: disable-msg=W0703
            reactor.callFromThread(deferred.errback, e)
        else:
            reactor.callFromThread(deferred.callback, True)

00301     def zip(self, upload):
        """
        Acquire, do the compression in a thread, release.
        """
        d_zip = defer.Deferred()
        d_lck = self.acquire()
        d_lck.addCallback(
            lambda _: reactor.callFromThread(self._compress,
                                             d_zip, upload) or d_zip)
        d_lck.addCallback(lambda _: self.release())

        return d_lck


00315 class RequestQueue(object):
    """
    RequestQueue is a queue that ensures that there is at most one
    request at a time 'on the wire', and that uses the action queue's
    states for its syncrhonization.
    """
    def __init__(self, name, action_queue):
        super(RequestQueue, self).__init__()
        self.name = name
        self.action_queue = action_queue
        self.waiting = deque()
        self.head = None
        self.paused = True

00329     def __len__(self):
        """return the length of the queue"""
        return len(self.waiting)

00333     def queue(self, func, *args, **kwargs):
        """
        Add a call to func to the queue.
        """
        d = defer.Deferred()
        d.addCallback(lambda _: defer.maybeDeferred(func, *args, **kwargs))
        self.waiting.append(d)
        if len(self.waiting) == 1 and not self.head:
            self.action_queue.event_queue.push('SYS_' + self.name
                                               + '_WAITING')
        return d

00345     def run(self):
        """
        Empty the queue.
        """
        if self.waiting:
            d = self.waiting.popleft()
            d.addBoth(passit(lambda _: self.run()))
            d.callback(None)
        else:
            self.action_queue.event_queue.push('SYS_' + self.name + '_DONE')

00356 class ContentQueue(RequestQueue):
    """
    A content queue is a queue of content requests (uploads and downloads).
    """
00360     def run(self):
        """
        Empty the queue.
        """
        if self.waiting:
            d = self.waiting.popleft()
            d.addBoth(passit(lambda _: self.action_queue.event_queue.push(
                        'SYS_' + self.name + '_DONE')))
            d.addBoth(passit(lambda _: self.action_queue.event_queue.push(
                        'SYS_' + self.name + '_WAITING')))
            d.callback(None)
        else:
            self.action_queue.event_queue.push('SYS_' + self.name + '_DONE')


00375 class MetaQueue(RequestQueue):
    """
    A meta queue is a queue of metadata-related requests.
    """


00381 class DeferredMap(object):
    """
    A mapping of deferred values. Or a deferred map of values. Or a
    mapping that returns deferreds and then fires them when it has the
    value.
    """
    def __init__(self):
        self.waiting = defaultdict(list)
        self.failed = {}
        self.map = {}

00392     def get(self, key):
        """
        Get the value for the given key.

        This always returns a deferred; when we already know the value
        we return a `succeed`, and if we don't know the value because
        it failed we return a `fail`; otherwise we return a plain
        unfired `Deferred`, and add it to the list of deferreds to
        call when we actually get the value.
        """
        if key in self.map:
            return defer.succeed(self.map[key])
        if key in self.failed:
            return defer.fail(Exception(self.failed[key]))
        d = defer.Deferred()
        self.waiting[key].append(d)
        return d

00410     def set(self, key, value):
        """
        We've got the value for a key! Write it down in the map, and
        fire the waiting deferreds.
        """
        if key not in self.map:
            self.map[key] =  value
            for d in self.waiting.pop(key, ()):
                d.callback(value)
        elif self.map[key] != value:
            if key in self.map:
                raise KeyError("key is taken -- dunno what to do")

00423     def err(self, key, failure):
        """
        Something went terribly wrong in the process of getting a
        value. Break the news to the waiting deferreds.
        """
        self.failed[key] = failure.getErrorMessage()
        for d in self.waiting.pop(key, ()):
            d.errback(failure)


00433 class UploadProgressWrapper(object):
    """
    A wrapper around the file-like object used for Uploads, with which
    we can keep track of the number of bytes that have been written to
    the store.
    """
00439     def __init__(self, fd, data_dict):
        """
        fd is the file-like object used for uploads. data_dict is the
        entry in the uploading dictionary.
        """
        self.fd = fd
        self.data_dict = data_dict
        self.n_bytes_read = 0

00448     def read(self, size=None):
        """
        read at most size bytes from the file-like object.

        Keep track of the number of bytes that have been read, and the
        number of bytes that have been written (assumed to be equal to
        the number of bytes read on the previews call to read). The
        latter is done directly in the data_dict.
        """
        self.data_dict['n_bytes_written'] = self.n_bytes_read
        data = self.fd.read(size)
        self.n_bytes_read += len(data)
        return data

00462     def __getattr__(self, attr):
        """
        Proxy all the rest.
        """
        return getattr(self.fd, attr)


00469 class ActionQueue(StorageClientFactory, object):
    """
    This is the ActionQueue itself.
    """
    implements(IActionQueue)
    protocol = ActionQueueProtocol

    def __init__(self, event_queue, host, port, dns_srv,
                 use_ssl=False, disable_ssl_verify=False):
        self.event_queue = event_queue
        self.host = host
        self.port = port
        self.dns_srv = dns_srv
        self.use_ssl = use_ssl
        self.disable_ssl_verify = disable_ssl_verify

        self.token = None
        self.client = None
        self.deferred = None

        self.content_queue = ContentQueue('CONTENT_QUEUE', self)
        self.meta_queue = MetaQueue('META_QUEUE', self)
        self.uuid_map = DeferredMap()
        self.zip_queue = ZipQueue()

        self.uploading = {}
        self.downloading = {}

        event_queue.subscribe(self)

00499     def handle_SYS_CONNECT(self, access_token):
        """
        Stow the access token away for later use
        """
        self.token = access_token

00505     def cleanup(self):
        """
        Cancel, clean up, and reschedule things that were in progress
        when a disconnection happened
        """
        self.event_queue.push('SYS_CLEANUP_STARTED')
        if self.client is not None:
            self.client.disconnect()
        for queue in self.meta_queue, self.content_queue:
            if queue.head is not None:
                queue.head.errback(RequestCleanedUp('Cleaned up'))
        self.event_queue.push('SYS_CLEANUP_FINISHED')


00519     def _node_state_callback(self, share_id, node_id, hash):
        """
        Called by the client when notified that node changed.
        """
        self.event_queue.push('SV_HASH_NEW',
                              share_id=share_id, node_id=node_id, hash=hash)

00526     def _share_change_callback(self, message, info):
        """
        Called by the client when notified that a share changed.
        """
        self.event_queue.push('SV_SHARE_CHANGED',
                              message=message, info=info)

00533     def _share_answer_callback(self, share_id, answer):
        """
        Called by the client when it gets a share answer notification.
        """
        self.event_queue.push('SV_SHARE_ANSWERED',
                              share_id=str(share_id), answer=answer)

00540     def _lookup_srv(self):
        """ do the SRV lookup and return a deferred whose callback is going
        to be called with (host, port). If we can't do the lookup, the default
        host, port is used.
        """
        def on_lookup_ok(results):
            """Get a random host from the SRV result."""
            logger.debug('SRV lookup done, choosing a server')
            records, auth, add = results
            if not records:
                raise ValueError("No available records.")
            # pick a random server
            record = random.choice(records)
            logger.debug('Using record: %r', record)
            if record.payload:
                return record.payload.target.name, record.payload.port
            else:
                logger.info('Empty SRV record, fallback to %r:%r',
                            self.host, self.port)
                return self.host, self.port

        def on_lookup_error(failure):
            """ return the default host/post on a DNS SRV lookup failure. """
            logger.info("SRV lookup error, fallback to %r:%r \n%s",
                        self.host, self.port, failure.getTraceback())
            return self.host, self.port

        if self.dns_srv:
            # lookup the DNS SRV records
            d = dns_client.lookupService(self.dns_srv, timeout=[3, 2])
            d.addCallback(on_lookup_ok)
            d.addErrback(on_lookup_error)
            return d
        else:
            return defer.succeed((self.host, self.port))

00576     def connect(self):
        """
        Start the circus going.
        """
        self.deferred = defer.Deferred()
        d = self._lookup_srv()
        def _connect(result):
            """ do the real thing """
            host, port = result
            sslContext = get_ssl_context(self.disable_ssl_verify)

            if self.use_ssl:
                reactor.connectSSL(host, port, self, sslContext)
            else:
                reactor.connectTCP(host, port, self)
        d.addCallback(_connect)
        return self.deferred

00594     def conectionFailed(self, reason=None):
        """
        Called when the connect() call fails
        """
        self.deferred.errback(reason)

00600     def get_root(self, marker):
        """
        Get the user's root uuid. Use the uuid_map, so the caller can
        use the marker in followup operations.
        """
        log = mklog(logger, 'get_root', '', marker, marker=marker)
        log.debug('starting')
        d = self.client.get_root()
        d.addCallbacks(*log.callbacks())
        d.addCallbacks(passit(lambda root: self.uuid_map.set(marker, root)),
                       passit(lambda f: self.uuid_map.err(marker, f)))

        return d

00614     def make_file(self, share_id, parent_id, name, marker):
        """
        See .interfaces.IMetaQueue
        """
        return MakeFile(self, share_id, parent_id, name, marker).start()

00620     def make_dir(self, share_id, parent_id, name, marker):
        """
        See .interfaces.IMetaQueue
        """
        return MakeDir(self, share_id, parent_id, name, marker).start()

00626     def move(self, share_id, node_id, old_parent_id, new_parent_id, new_name):
        """
        See .interfaces.IMetaQueue
        """
        return Move(self, share_id, node_id, old_parent_id,
                    new_parent_id, new_name).start()

00633     def unlink(self, share_id, parent_id, node_id):
        """
        See .interfaces.IMetaQueue
        """
        return Unlink(self, share_id, parent_id, node_id).start()

00639     def query(self, items):
        """
        See .interfaces.IMetaQueue
        """
        return Query(self, items).start()

00645     def list_shares(self):
        """
        List the shares; put the result on the event queue
        """
        return ListShares(self).start()

00651     def answer_share(self, share_id, answer):
        """
        Answer the offer of a share.
        """
        return AnswerShare(self, share_id, answer).start()

00657     def create_share(self, node_id, share_to, name, access_level, marker):
        """
        Share a node with somebody.
        """
        return CreateShare(self, node_id, share_to, name, access_level,
                           marker).start()

00664     def listdir(self, share_id, node_id, server_hash, fileobj_factory):
        """
        See .interfaces.IMetaQueue.listdir
        """
        return ListDir(self, share_id, node_id, server_hash,
                       fileobj_factory).start()

00671     def download(self, share_id, node_id, server_hash, fileobj_factory):
        """
        See .interfaces.IContentQueue.download
        """
        return Download(self, share_id, node_id, server_hash,
                        fileobj_factory).start()

00678     def upload(self, share_id, node_id, previous_hash, hash, crc32,
               size, fileobj_factory, tempfile_factory=None):
        """
        See .interfaces.IContentQueue
        """
        return Upload(self, share_id, node_id, previous_hash, hash,
                      crc32, size, fileobj_factory, tempfile_factory).start()

00686     def cancel_upload(self, share_id, node_id):
        """
        See .interfaces.IContentQueue
        """
        log = mklog(logger, 'cancel_upload', share_id, node_id,
                    share=share_id, node=node_id)
        log.debug('starting')
        if (share_id, node_id) in self.uploading:
            req = self.uploading[share_id, node_id].get('req')
            if req is not None:
                req.cancel()
                log.debug("cancelled")
        log.debug('finished')

00700     def cancel_download(self, share_id, node_id):
        """
        See .interfaces.IContentQueue
        """
        log = mklog(logger, 'cancel_download', share_id, node_id,
                    share=share_id, node=node_id)
        log.debug('starting')
        if (share_id, node_id) in self.downloading:
            req = self.downloading[share_id, node_id].get('req')
            if req is not None:
                req.cancel()
                log.debug("cancelled")
        log.debug('finished')


SKIP_THIS_ITEM = object()

# pylint: disable-msg=W0231

00719 class ActionQueueCommand(object):
    """
    Base of all the action queue commands
    """

    # protobuf doesn't seem to have much introspectionable stuff
    # without going into private attributes
    known_error_messages = (set(protocol_pb2._ERROR_ERRORTYPE.values_by_name)
                            | set(['CANCELLED']))
    suppressed_error_messages = (known_error_messages
                                 - set(['INTERNAL_ERROR'])
                                 | set(['Cleaned up']))
    retryable_errors = set(['Cleaned up', 'TRY_AGAIN'])

00733     def demark(self, *maybe_markers):
        """
        Arrange to have maybe_markers realized
        """
        l = []
        for marker in maybe_markers:
            if IMarker.providedBy(marker):
                self.log.debug('waiting until we know the real value of %s'
                               % marker)
                d = self.action_queue.uuid_map.get(marker)
                d.addCallbacks(passit(lambda _:
                                          self.log.debug('got %s' % marker)),
                               passit(lambda f:
                                          self.log.error('failed %s' % marker)))
            else:
                d = defer.succeed(marker)
            l.append(d)
        dl = defer.DeferredList(l, fireOnOneErrback=True, consumeErrors=True)
        dl.addCallbacks(self.unwrap,
                        lambda f: f.value.subFailure)
        return dl

    @staticmethod
00756     def unwrap(results):
        """
        Unpack the values from the result of a DeferredList. If
        there's a failure, return it instead.
        """
        values = []
        for result in results:
            # result can be none if one of the callbacks failed
            # before the others were ready
            if result is not None:
                is_ok, value = result
                if not is_ok:
                    # a failure!
                    return value
                if value is not SKIP_THIS_ITEM:
                    values.append(value)
        return values


00775     def end_callback(self, arg):
        """
        It worked!
        """
        self._queue.head = None
        self.log.debug('success')
        return self.handle_success(arg)

00783     def end_errback(self, failure):
        """
        It failed!
        """
        self._queue.head = None
        error_message = failure.getErrorMessage()
        if error_message not in self.suppressed_error_messages:
            self.log.error('failure', error_message)
            self.log.debug('traceback follows:\n\n' + failure.getTraceback())
        else:
            self.log.warn('failure', error_message)
        self.cleanup()
        if error_message in self.retryable_errors:
            reactor.callLater(0.1, self.retry)
        else:
            return self.handle_failure(failure)

00800     def start(self, _=None):
        """
        Get ready to run, and then run.

        The default implementation is for when there is no preparation necessary
        """
        d = self._start()
        d.addCallback(self.store_marker_result)
        d.addCallback(lambda _: self.log.debug('queueing in the %s'
                                               % self._queue.name))
        d.addCallbacks(lambda _: self._queue.queue(self.run),
                       self.handle_failure)
        return d

00814     def cleanup(self):
        """
        Do whatever is needed to clean up from a failure (such as stop
        producers and other such that aren't cleaned up appropriately
        on their own)

        The default implementation does absolutely nothing.
        """

00823     def _start(self):
        """
        Do the specialized pre-run setup
        """
        return defer.succeed(None)

00829     def store_marker_result(self, _):
        """
        Called when all the markers are realized.
        """

00834     def run(self):
        """
        Do the deed.
        """
        self.log.debug('starting')
        self._queue.head = d = self._run()
        d.addCallbacks(self.end_callback, self.end_errback)
        return d

00843     def handle_success(self, success):
        """
        Do anthing that's needed to handle success of the operation.
        """
        return success

00849     def handle_failure(self, failure):
        """
        Do anthing that's needed to handle failure of the operation.
        Note that cancellation and TRY_AGAIN are already handled.
        """
        return failure

00856     def retry(self):
        """
        Request cancelled or TRY_AGAIN. Well then, try again!
        """
        return self._queue.queue(self.run)



00864 class ActionQueueMetaCommand(ActionQueueCommand):
    """
    Base of metadata-related commands (commands that are queued in the
    meta queue)
    """
    @property
00870     def _queue(self):
        """
        Get at the meta queue
        """
        return self.action_queue.meta_queue


00877 class ActionQueueContentCommand(ActionQueueCommand):
    """
    Base of content-related commands (commands that are queued in the
    content queue)
    """
    @property
00883     def _queue(self):
        """
        Get at the content queue
        """
        return self.action_queue.content_queue


00890 class MakeThing(ActionQueueMetaCommand):
    """
    Base of MakeFile and MakeDir
    """
    def __init__(self, action_queue, share_id, parent_id, name, marker):
        self.action_queue = action_queue
        self.share_id = share_id
        self.parent_id = parent_id
        # Unicode boundary! the name is Unicode in protocol and server, but
        # here we use bytes for paths
        self.name = name.decode("utf8")
        self.marker = marker
        self.log = mklog(logger, self.__class__.__name__, share_id, marker,
                         share=share_id, parent=parent_id, name=name,
                         marker=marker)

00906     def _start(self):
        """
        Do the specialized pre-run setup
        """
        return self.demark(self.share_id, self.parent_id)

00912     def store_marker_result(self, (share_id, parent_id)):
        """
        Called when all the markers are realized.
        """
        self.share_id = share_id
        self.parent_id = parent_id

00919     def _run(self):
        """
        Do the actual running
        """
        maker = getattr(self.action_queue.client, self.client_method)
        return maker(self.share_id,
                     self.parent_id,
                     self.name)

00928     def handle_success(self, success):
        """
        It worked! Push the event, and update the uuid map.
        """
        # note that we're not getting the new name from the answer
        # message, if we would get it, we would have another Unicode
        # boundary with it
        self.action_queue.event_queue.push(self.ok_event_name,
                                           marker=self.marker,
                                           new_id=success.new_id)
        if IMarker.providedBy(self.marker):
            self.action_queue.uuid_map.set(self.marker, success.new_id)
        return success

00942     def handle_failure(self, failure):
        """
        It didn't work! Push the event, and update the uuid map.
        """
        self.action_queue.event_queue.push(self.error_event_name,
                                           marker=self.marker,
                                           error=failure.getErrorMessage())
        if IMarker.providedBy(self.marker):
            self.action_queue.uuid_map.err(self.marker,
                                           failure)


00954 class MakeFile(MakeThing):
    """
    Make a file
    """
    ok_event_name = 'AQ_FILE_NEW_OK'
    error_event_name = 'AQ_FILE_NEW_ERROR'
    client_method = 'make_file'


00963 class MakeDir(MakeThing):
    """
    Make a directory
    """
    ok_event_name = 'AQ_DIR_NEW_OK'
    error_event_name = 'AQ_DIR_NEW_ERROR'
    client_method = 'make_dir'


00972 class Move(ActionQueueMetaCommand):
    """
    Move a file or directory
    """
    def __init__(self, action_queue, share_id, node_id, old_parent_id,
                 new_parent_id, new_name):
        self.action_queue = action_queue
        self.share_id = share_id
        self.node_id = node_id
        self.old_parent_id = old_parent_id
        self.new_parent_id = new_parent_id
        # Unicode boundary! the name is Unicode in protocol and server, but
        # here we use bytes for paths
        self.new_name = new_name.decode("utf8")
        self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
                         share=share_id, node=node_id,
                         old_parent=old_parent_id, new_parent=new_parent_id,
                         new_name=new_name)

00991     def _start(self):
        """
        Do the specialized pre-run setup
        """
        return self.demark(self.share_id, self.node_id, self.new_parent_id)

00997     def store_marker_result(self, (share_id, node_id, new_parent_id)):
        """
        Called when all the markers are realized.
        """
        self.share_id = share_id
        self.node_id = node_id
        self.new_parent_id = new_parent_id

01005     def _run(self):
        """
        Do the actual running
        """
        return self.action_queue.client.move(self.share_id,
                                             self.node_id,
                                             self.new_parent_id,
                                             self.new_name)
01013     def handle_success(self, success):
        """
        It worked! Push the event.
        """
        self.action_queue.event_queue.push('AQ_MOVE_OK',
                                           share_id=self.share_id,
                                           node_id=self.node_id)
        return success

01022     def handle_failure(self, failure):
        """
        It didn't work! Push the event.
        """
        self.action_queue.event_queue.push('AQ_MOVE_ERROR',
                                           error=failure.getErrorMessage(),
                                           share_id=self.share_id,
                                           node_id=self.node_id,
                                           old_parent_id=self.old_parent_id,
                                           new_parent_id=self.new_parent_id,
                                           new_name=self.new_name)


01035 class Unlink(ActionQueueMetaCommand):
    """
    Unlink a file or dir
    """
    def __init__(self, action_queue, share_id, parent_id, node_id):
        self.action_queue = action_queue
        self.share_id = share_id
        self.node_id = node_id
        self.parent_id = parent_id
        self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
                         share=share_id, node=node_id, parent=parent_id)

01047     def _start(self):
        """
        Do the specialized pre-run setup
        """
        return self.demark(self.share_id, self.node_id, self.parent_id)

01053     def store_marker_result(self, (share_id, node_id, parent_id)):
        """
        Called when all the markers are realized.
        """
        self.share_id = share_id
        self.node_id = node_id
        self.parent_id = parent_id

01061     def _run(self):
        """
        Do the actual running
        """
        return self.action_queue.client.unlink(self.share_id, self.node_id)

01067     def handle_success(self, success):
        """
        It worked! Push the event.
        """
        self.action_queue.event_queue.push('AQ_UNLINK_OK',
                                           share_id=self.share_id,
                                           parent_id=self.parent_id,
                                           node_id=self.node_id)
        return success

01077     def handle_failure(self, failure):
        """
        It didn't work! Push the event.
        """
        self.action_queue.event_queue.push('AQ_UNLINK_ERROR',
                                           error=failure.getErrorMessage(),
                                           share_id=self.share_id,
                                           parent_id=self.parent_id,
                                           node_id=self.node_id)


01088 class Query(ActionQueueMetaCommand):
    """
    Ask about the freshness of server hashes
    """
    def __init__(self, action_queue, items):
        self.log = MultiProxy(
            [mklog(logger, '(unrolled) query', share, node,
                   share=share, node=node, hash=hash, index=i)
             for (i, (share, node, hash)) in enumerate(items)])
        self.action_queue = action_queue
        self.items = items

01100     def store_marker_result(self, items):
        """
        Called when all the markers are realized.
        """
        self.items = items

01106     def _start(self):
        """
        Do the specialized pre-run setup
        """
        # node_hash will (should?) never be a marker, but it's the
        # easiest way to keep the trio together: send it along for the
        # trip
        dl = []
        for item in self.items:
            d = self.demark(*item)
            d.addErrback(self.handle_single_failure, item)
            dl.append(d)
        d = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
        d.addCallbacks(self.unwrap)
        return d

01122     def handle_failure(self, failure):
        """
        It didn't work! Never mind.
        """
        pass

01128     def handle_single_failure(self, failure, item):
        """
        The only failure mode of a Query is for a query to be done
        using a marker that fails to realize.
        """
        self.action_queue.event_queue.push('AQ_QUERY_ERROR', item=item,
                                           error=failure.getErrorMessage())
        return SKIP_THIS_ITEM

01137     def _run(self):
        """
        Do the actual running
        """
        return self.action_queue.client.query(self.items)


01144 class ListShares(ActionQueueMetaCommand):
    """
    List shares shared to me
    """
    def __init__(self, action_queue):
        self.action_queue = action_queue
        self.log = mklog(logger, 'list_shares', UNKNOWN, UNKNOWN)

01152     def _run(self):
        """
        Do the actual running
        """
        return self.action_queue.client.list_shares()

01158     def handle_success(self, success):
        """
        It worked! Push the event.
        """
        self.action_queue.event_queue.push('AQ_SHARES_LIST',
                                           shares_list=success)

01165     def handle_failure(self, failure):
        """
        It didn't work! Push the event.
        """
        self.action_queue.event_queue.push('AQ_LIST_SHARES_ERROR',
                                           error=failure.getErrorMessage())


01173 class AnswerShare(ActionQueueMetaCommand):
    """
    Answer a share offer
    """
    def __init__(self, action_queue, share_id, answer):
        self.action_queue = action_queue
        self.share_id = share_id
        self.answer = answer
        self.log = mklog(logger, 'answer_share', share_id, UNKNOWN)

01183     def _run(self):
        """
        Do the actual running
        """
        return self.action_queue.client.accept_share(self.share_id, self.answer)


01190 class CreateShare(ActionQueueMetaCommand):
    """
    Offer a share to somebody
    """
    def __init__(self, action_queue, node_id, share_to, name, access_level,
                 marker):
        self.action_queue = action_queue
        self.node_id = node_id
        self.share_to = share_to
        self.name = name
        self.access_level = access_level
        self.marker = marker
        self.log = mklog(logger, self.__class__.__name__, UNKNOWN, node_id)

01204     def store_marker_result(self, (node_id,)):
        """
        Called when all the markers are realized.
        """
        self.node_id = node_id

01210     def _start(self):
        """
        Do the specialized pre-run setup
        """
        return self.demark(self.node_id)

01216     def _run(self):
        """
        Do the actual running
        """
        return self.action_queue.client.create_share(self.node_id,
                                                     self.share_to,
                                                     self.name,
                                                     self.access_level)

01225     def handle_success(self, success):
        """
        It worked! Push the event.
        """
        self.action_queue.event_queue.push('AQ_CREATE_SHARE_OK',
                                           share_id=success.share_id,
                                           marker=self.marker)
        return success

01234     def handle_failure(self, failure):
        """
        It didn't work! Push the event.
        """
        self.action_queue.event_queue.push('AQ_CREATE_SHARE_ERROR',
                                           marker=self.marker,
                                           error=failure.getErrorMessage())


01243 class GetContentMixin(object):
    """
    Base for ListDir and Download. It's a mixin (ugh) because
    otherwise things would be even more confusing
    """
    def __init__(self, action_queue, share_id, node_id, server_hash,
                 fileobj_factory):
        self.action_queue = action_queue
        self.share_id = share_id
        self.node_id = node_id
        self.server_hash = server_hash
        self.fileobj_factory = fileobj_factory
        self.fileobj = None
        self.gunzip = zlib.decompressobj()
        self.log = mklog(logger, self.__class__.__name__, share_id, node_id,
                         share=share_id, node=node_id, server_hash=server_hash,
                         fileobj_factory=fileobj_factory)
        if (self.share_id, self.node_id) in self.action_queue.downloading:
            self.action_queue.cancel_download(self.share_id, self.node_id)

01263     def _start(self):
        """
        Do the specialized pre-run setup
        """
        return self.demark(self.node_id)

01269     def store_marker_result(self, (node_id,)):
        """
        Called when all the markers are realized.
        """
        self.node_id = node_id

01275     def _run(self):
        """
        Do the actual running
        """
        try:
            self.fileobj = self.fileobj_factory()
        except StandardError:
            return defer.fail(Failure('unable to build fileobj'
                                      ' (file went away?)'
                                      ' so aborting the download.'))
        self.action_queue.downloading[self.share_id,
                                      self.node_id] = {'n_bytes_read': 0}

        self.action_queue.event_queue.push('AQ_DOWNLOAD_STARTED',
                                           share_id=self.share_id,
                                           node_id=self.node_id,
                                           server_hash=self.server_hash)

        req = self.action_queue.client.get_content_request(
            self.share_id, self.node_id, self.server_hash,
            callback=self.cb, node_attr_callback=self.nacb)
        self.action_queue.downloading[self.share_id, self.node_id]['req'] = req
        d = req.deferred
        d.addBoth(passit(lambda _:
                             self.action_queue.downloading.pop((self.share_id,
                                                                self.node_id))))
        d.addErrback(passit(lambda _: self.reset_fileobj()))
        return d

01304     def handle_success(self, _):
        """
        It worked! Push the event.
        """
        self.action_queue.event_queue.push('AQ_DOWNLOAD_FINISHED',
                                           share_id=self.share_id,
                                           node_id=self.node_id,
                                           server_hash=self.server_hash)

01313     def handle_failure(self, failure):
        """
        It didn't work! Push the event.
        """
        self.action_queue.event_queue.push('AQ_DOWNLOAD_ERROR',
                                           error=failure.getErrorMessage(),
                                           share_id=self.share_id,
                                           node_id=self.node_id,
                                           server_hash=self.server_hash)

01323     def reset_fileobj(self):
        """
        Rewind and empty the file (i.e. get it ready to try again if
        necessary)
        """
        if self.fileobj is not None:
            self.fileobj.seek(0, 0)
            self.fileobj.truncate(0)

01332     def cb(self, bytes):
        """
        A streaming decompressor
        """
        dloading = self.action_queue.downloading[self.share_id,
                                                 self.node_id]
        dloading['n_bytes_read'] += len(bytes)
        self.fileobj.write(self.gunzip.decompress(bytes))
        self.fileobj.flush()     # not strictly necessary but nice to
                                 # see the downloaded size

01343     def nacb(self, **kwargs):
        """
        set the node attrs in the 'currently downloading' dict
        """
        self.action_queue.downloading[self.share_id,
                                      self.node_id].update(kwargs)

01350     def sync(self, _):
        """
        Flush the buffers and sync them to disk if possible
        """
        self.fileobj.write(self.gunzip.flush())
        self.fileobj.flush()
        if getattr(self.fileobj, 'fileno', None) is not None:
            # it's a real file, with a fileno! Let's sync its data
            # out to disk
            os.fsync(self.fileobj.fileno())
        self.fileobj.close()


01363 class ListDir(GetContentMixin, ActionQueueMetaCommand):
    """
    Get a listing of a directory's contents
    """

01368 class Download(GetContentMixin, ActionQueueContentCommand):
    """
    Get the contents of a file.
    """

01373 class Upload(ActionQueueContentCommand):
    """
    Upload stuff to a file
    """
    retryable_errors = (ActionQueueContentCommand.retryable_errors
                        | set(['UPLOAD_IN_PROGRESS']))

    def __init__(self, action_queue, share_id, node_id, previous_hash, hash,
                 crc32, size, fileobj_factory, tempfile_factory):
        self.action_queue = action_queue
        self.share_id = share_id
        self.node_id = node_id
        self.previous_hash = previous_hash
        self.hash = hash
        self.crc32 = crc32
        self.size = size
        self.fileobj_factory = fileobj_factory
        self.tempfile_factory = tempfile_factory
        self.deflated_size = None
        self.tempfile = None
        self.cancelled = False
        self.upload_req = None
        self.log = mklog(logger, 'upload', share_id, node_id, share=share_id,
                         node=node_id, previous_hash=previous_hash,
                         hash=hash, crc32=crc32, size=size,
                         fileobj_factory=fileobj_factory)
        if (self.share_id, self.node_id) in self.action_queue.uploading:
            self.action_queue.cancel_upload(self.share_id, self.node_id)

01402     def cancel(self):
        """Cancel the upload."""
        self.cancelled = True
        if self.upload_req is not None:
            self.upload_req.cancel()

01408     def cleanup(self):
        """
        Cleanup: stop the producer.
        """
        if self.upload_req.producer is not None:
            self.upload_req.producer.stopProducing()

01415     def _start(self):
        """
        Do the specialized pre-run setup
        """
        d = defer.Deferred()

        uploading = {"hash": self.hash, "req": self}
        self.action_queue.uploading[self.share_id, self.node_id] = uploading

        d = self.action_queue.zip_queue.zip(self)
        d.addCallback(lambda _: self.demark(self.node_id))
        return d

01428     def store_marker_result(self, (node_id,)):
        """
        Called when all the markers are realized.
        """
        # update action_queue.uploading with the real node_id
        uploading = self.action_queue.uploading.pop((self.share_id,
                                                     self.node_id))
        self.node_id = node_id
        self.action_queue.uploading[self.share_id, node_id] = uploading


01439     def _run(self):
        """
        Do the actual running
        """
        uploading = {"hash": self.hash, "deflated_size": self.deflated_size,
                     "req": self}
        self.action_queue.uploading[self.share_id, self.node_id] = uploading

        self.action_queue.event_queue.push('AQ_UPLOAD_STARTED',
                                           share_id=self.share_id,
                                           node_id=self.node_id,
                                           hash=self.hash)

        if getattr(self.tempfile, 'name', None) is not None:
            self.tempfile = open(self.tempfile.name)
        f = UploadProgressWrapper(self.tempfile, uploading)
        req = self.action_queue.client.put_content_request(
            self.share_id, self.node_id, self.previous_hash, self.hash,
            self.crc32, self.size, self.deflated_size, f)
        self.upload_req = req
        d = req.deferred
        d.addBoth(passit(lambda _:
                             self.action_queue.uploading.pop((self.share_id,
                                                              self.node_id))))
        d.addBoth(passit(lambda _: self.tempfile.close()))
        return d

01466     def handle_success(self, _):
        """
        It worked! Push the event.
        """
        if getattr(self.tempfile, 'name', None) is not None:
            os.unlink(self.tempfile.name)
        self.action_queue.event_queue.push('AQ_UPLOAD_FINISHED',
                                           share_id=self.share_id,
                                           node_id=self.node_id,
                                           hash=self.hash)

01477     def handle_failure(self, failure):
        """
        It didn't work! Push the event.
        """
        if getattr(self.tempfile, 'name', None) is not None:
            os.unlink(self.tempfile.name)
        self.action_queue.event_queue.push('AQ_UPLOAD_ERROR',
                                           error=failure.getErrorMessage(),
                                           share_id=self.share_id,
                                           node_id=self.node_id,
                                           hash=self.hash)

Generated by  Doxygen 1.6.0   Back to index