Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

hash_queue.py

# ubuntuone.syncdaemon.hash_queue - hash queues
#
# Author: Facundo Batista <facundo@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
'''Module that implements the Hash Queue machinery.'''

from __future__ import with_statement

import logging
import threading
import functools
import Queue
import os

from twisted.internet import reactor

from ubuntuone.storageprotocol.hash import \
    content_hash_factory, crc32


00034 class _Hasher(threading.Thread):
    '''Class that lives in another thread, hashing all night long.'''
    def __init__(self, queue, end_mark, event_queue):
        self.logger = logging.getLogger('ubuntuone.SyncDaemon.HQ.hasher')
        self.end_mark = end_mark
        self.queue = queue
        self.push = functools.partial(event_queue.push, "HQ_HASH_NEW")
        threading.Thread.__init__(self)

00043     def run(self):
        '''Run the thread.'''
        while True:
            path = self.queue.get()
            if path is self.end_mark:
                break

            self.logger.info("Hasher: got path to hash: %r" % path)
            result = self._hash(path)
            if result:
                reactor.callFromThread(self.push, path, *result)
            self.logger.info("Hasher: path hash pushed:  path %r  hash %r"
                                                            % (path, result))

00057     def _hash(self, path):
        '''Actually hashes a file.'''
        hasher = content_hash_factory()
        crc = 0
        size = 0
        try:
            initial_stat = os.stat(path)
            with open(path) as fh:
                while True:
                    cont = fh.read(65536)
                    if not cont:
                        break
                    hasher.update(cont)
                    crc = crc32(cont, crc)
                    size += len(cont)
        except (IOError, OSError):
            return None

        return hasher.content_hash(), crc, size, initial_stat


00078 class HashQueue(object):
    '''Interface between the real Hasher and the rest of the world.'''

    def __init__(self, event_queue):
        self.logger = logging.getLogger('ubuntuone.SyncDaemon.HQ')
        self._queue = Queue.Queue()
        self._end_mark = object()
        t = _Hasher(self._queue, self._end_mark, event_queue)
        t.setDaemon(True)
        t.start()
        self.logger.info("HashQueue: _hasher started")

00090     def insert(self, path):
        '''Insert the path of a file to be hashed.'''
        self.logger.info("HashQueue: inserting path %r" % path)
        self._queue.put(path)

00095     def shutdown(self):
        '''Shutdown all resources.'''
        self._queue.put(self._end_mark)
        self.logger.info("HashQueue: _hasher stopped")

00100     def empty(self):
        '''Return whether we are empty or not'''
        return self._queue.empty()

00104     def __len__(self):
        '''Return the length of the queue'''
        return self._queue.qsize()

Generated by  Doxygen 1.6.0   Back to index