Logo Search packages:      
Sourcecode: ubuntuone-client version File versions

test_auth.py

# test_auth - Tests for ubuntuone.oauthdesktop.auth module
#
# Author: Stuart Langridge <stuart.langridge@canonical.com>
#
# Copyright 2009 Canonical Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.
"""Tests for the OAuth client code for StorageFS."""

import gnomekeyring
from contrib.mocker import ANY, IN, MockerTestCase
import testresources

from ubuntuone.storageprotocol import oauth
from ubuntuone.oauthdesktop.auth import (AuthorisationClient,
                                                NoAccessToken)


00029 class AuthorisationClientTests(MockerTestCase):
    """Test the GNOME keyring integration portions of the auth code."""

00032     def setUp(self):
        """Sets up a mock keyring."""
        MockerTestCase.setUp(self)
        self.keyring = self.mocker.mock()
        self.client = None
        self.item = self.mocker.mock(gnomekeyring.Found)

        self.item_id = 999

        ex = self.expect(self.item.item_id)
        ex.result(self.item_id)
        ex.count(0, None)

        ex = self.expect(self.item.secret)
        ex.result('oauth_token=access_key&oauth_token_secret=access_secret')
        ex.count(0, None)

00049     def expect_token_query(self):
        """Expects the keyring to be queried for a token."""
        return self.expect(
            self.keyring.find_items_sync(
                gnomekeyring.ITEM_GENERIC_SECRET,
                {'ubuntuone-realm': 'realm',
                 'oauth-consumer-key': 'consumer_key'})
            )

00058     def expect_token_store(self):
        """Expects the token to be stored in the keyring."""
        self.expect(self.keyring.item_create_sync(
            None, gnomekeyring.ITEM_GENERIC_SECRET,
            'UbuntuOne token for realm',
            {'ubuntuone-realm': 'realm', 'oauth-consumer-key': 'consumer_key'},
            # Either order for the token and secret is valid in the item secret
            IN(['oauth_token=access_key&oauth_token_secret=access_secret',
                'oauth_token_secret=access_secret&oauth_token=access_key']),
            True)).result(self.item_id)

00069     def mock_has_token(self):
        """Mocks a cached token in the keyring."""
        self.expect_token_query().result([self.item])

00073     def mock_no_token(self, exception):
        """Mocks no token in the keyring."""
        self.expect_token_query().throw(exception)

00077     def replay(self, callback_parent=None, callback_denied=None,
                     do_login=True):
        """Starts the replay phase and sets up a client object to be tested,
        wired up to the mock keyring.

        """
        self.mocker.replay()
        self.client = AuthorisationClient(
            'realm', 'request_token_url', 'user_authorisation_url',
            'access_token_url', 'consumer_key', 'consumer_secret',
            callback_parent, callback_denied, do_login, keyring=self.keyring)

00089     def test_get_access_token(self):
        """The get_access_token method returns the access token"""
        self.mock_has_token()
        self.replay()
        token = self.client.get_access_token()
        self.assertTrue(isinstance(token, oauth.OAuthToken))
        self.assertEqual(token.key, 'access_key')
        self.assertEqual(token.secret, 'access_secret')

00098     def test_get_access_token_no_match(self):
        """The get_access_token method fails if there are no matching items"""
        self.mock_no_token(gnomekeyring.NoMatchError)
        self.replay()
        self.assertRaises(NoAccessToken, self.client.get_access_token)

00104     def test_get_access_token_denied(self):
        """The get_access_token method fails if access is denied"""
        self.mock_no_token(gnomekeyring.DeniedError)
        self.replay()
        self.assertRaises(NoAccessToken, self.client.get_access_token)

00110     def test_have_access_token(self):
        """The `have_access_token` method returns True if a the
        keyring contains a token."""
        self.mock_has_token()
        self.replay()
        self.assertEqual(self.client.have_access_token(), True)

00117     def test_have_access_token_fail(self):
        """The `have_access_token` method returns False if the keyring
        does not contain a token."""
        self.mock_no_token(gnomekeyring.NoMatchError)
        self.replay()
        self.assertEqual(self.client.have_access_token(), False)

00124     def test_store_token(self):
        """The store_token method correctly stores an item in the keyring,
           and correctly sets an ACL on it."""
        self.expect_token_store()
        saka = self.mocker.replace(
          "ubuntuone.oauthdesktop.key_acls.set_all_key_acls")
        saka(item_id=self.item_id)
        self.mocker.result(None)

        sleep = self.mocker.replace("time.sleep")
        sleep(4)
        self.mocker.result(None)

        self.replay()
        self.client.store_token(oauth.OAuthToken('access_key', 'access_secret'))

00140     def test_clear_existing_token(self):
        """Makes sure that clear token clears an existing token."""
        self.mock_has_token()
        self.expect(self.keyring.item_delete_sync(None, self.item_id))
        self.replay()
        self.client.clear_token()

00147     def test_clear_no_existing_token(self):
        """Makes sure that clear with no existing token still works."""
        self.mock_no_token(gnomekeyring.NoMatchError)
        self.replay()
        self.client.clear_token()

00153     def test_ensure_access_token(self):
        """If the user already has a token, no new token is requested."""
        self.mock_has_token()
        callback_function = self.mocker.mock()
        callback_function(ANY)
        self.replay(callback_parent=callback_function)
        self.client.ensure_access_token()

00161     def test_ensure_access_token_no_token(self):
        """If the user has no token, a new one is requested and stored, via
           an OAuth callback to the internal webserver."""

        self.mock_no_token(gnomekeyring.NoMatchError)

        request_token = self.mocker.mock()
        self.expect(request_token.key).result('access_key').count(0, None)
        ex = self.expect(request_token.secret)
        ex.result('access_secret').count(0, None)
        make_token_request = self.mocker.mock()
        self.expect(make_token_request(ANY)).result(request_token)

        open_in_browser = self.mocker.mock()
        open_in_browser(ANY)

        ri = self.mocker.replace("random.randint")
        ri(1000000, 10000000)
        self.mocker.result(12345678)
        get_temporary_httpd = self.mocker.mock()
        get_temporary_httpd(12345678, ANY, True)
        self.mocker.result("http://callbackurl")

        self.replay(callback_parent=lambda a: None)

        self.client.make_token_request = make_token_request
        self.client.open_in_browser = open_in_browser
        self.client.get_temporary_httpd = get_temporary_httpd
        # skip the "are we online, via networkmanager" bit
        self.client.acquire_access_token_if_online = \
            self.client.acquire_access_token
        self.client.ensure_access_token()


00195 class AcquireAccessTokenTests(testresources.ResourcedTestCase):
    """OAuth token acquisition tests."""

    def setUp(self):
        super(AcquireAccessTokenTests, self).setUp()

    def tearDown(self):
        super(AcquireAccessTokenTests, self).tearDown()

00204     def test_acquire_access_token(self):
        """Test that acquire_access_token() can acquire the access token."""

        def make_token_request(oauth_request):
            """Make an OAuth token request via the test browser."""
            return oauth.OAuthToken.from_string("oauth_token=access_token&" +
                                                "oauth_token_secret=" +
                                                "access_secret")

        def open_in_browser(url):
            """Just return as we aren't subscribed to the page."""
            return

        def got_token(token):
            """Called with the token once auth is completed"""
            self.assertTrue(isinstance(token, oauth.OAuthToken))

        def get_temporary_httpd(nonce, retrieve_function, store):
            """Mock the temporary httpd and return a callback URL"""
            # returns callback URL; this is an invalid URL, of course,
            # (port is too high) but we check later that the mechanize
            # browser tries to navigate there
            return "http://127.0.0.1:99999/?nonce=99999"

        def store_token(token):
            """Don't use the keyring; do nothing"""
            pass

        client = AuthorisationClient(
            'http://ubuntuone/',
            'http://ubuntuone/oauth/request/',
            'http://ubuntuone/oauth/authorize/',
            'http://ubuntuone/oauth/access/',
            'consumer_key', 'consumer_secret',
            callback_parent=got_token)
        client.make_token_request = make_token_request
        client.open_in_browser = open_in_browser
        client.get_temporary_httpd = get_temporary_httpd
        client.store_token = store_token

        client.acquire_access_token('token description')

Generated by  Doxygen 1.6.0   Back to index