Commit 8ea2dc22 authored by Miquel Torres's avatar Miquel Torres
Browse files

Remove tastypie dependency

parent 86b29e12
# -*- coding: utf-8 -*-
"""
Tests related to RESTful API
"""
from datetime import datetime
import copy
import json
import logging
import unittest
from django import test
from django.db.utils import IntegrityError
from django.test.client import Client
from django.http import HttpRequest
from django.core.urlresolvers import reverse
from django.conf import settings
from django.contrib.auth.models import User, Permission
from tastypie.authorization import (Authorization, ReadOnlyAuthorization,
DjangoAuthorization)
from tastypie.exceptions import ImmediateHttpResponse, Unauthorized
from tastypie.models import ApiKey, create_api_key
from tastypie.http import HttpUnauthorized
from tastypie.authentication import ApiKeyAuthentication
from codespeed.api import EnvironmentResource, ProjectResource
from codespeed.models import (Project, Benchmark, Revision, Branch,
Executable, Environment, Result, Report)
from codespeed.api import ResultBundle
from codespeed import settings as default_settings
class FixtureTestCase(test.TestCase):
fixtures = ["gettimeline_unittest.json"]
def setUp(self):
self.api_user = User.objects.create_user(
username='apiuser', email='api@foo.bar', password='password')
self.api_user.save()
self.api_user.user_permissions.clear()
john_doe = User.objects.create_user(
username='johndoe', email='api@foo.bar', password='password')
john_doe.save()
authorization = 'ApiKey {0}:{1}'.format(self.api_user.username,
self.api_user.api_key.key)
self.post_auth = {'HTTP_AUTHORIZATION': authorization}
def generic_test_authorized(self, method, request, function_list, function_detail):
bundle = self.resource.build_bundle(request=request)
bundle.request.method = method
self.assertTrue(
function_detail(self.resource.get_object_list(bundle.request)[0],
bundle))
def generic_test_unauthorized(self, method, request, function_list, function_detail):
bundle = self.resource.build_bundle(request=request)
bundle.request.method = method
self.assertRaises(Unauthorized, function_detail,
self.resource.get_object_list(bundle.request)[0], bundle)
class ApiKeyAuthenticationTestCase(FixtureTestCase):
def setUp(self):
super(ApiKeyAuthenticationTestCase, self).setUp()
ApiKey.objects.all().delete()
self.auth = ApiKeyAuthentication()
self.request = HttpRequest()
# Simulate sending the signal.
user = User.objects.get(username='apiuser')
create_api_key(User, instance=user, created=True)
def test_is_not_authenticated(self):
"""Should return HttpUnauthorized when incorrect credentials are given"""
# No username/api_key details
self.assertEqual(isinstance(
self.auth.is_authenticated(self.request), HttpUnauthorized), True)
# Wrong username details.
self.request.GET['username'] = 'foo'
self.assertEqual(isinstance(
self.auth.is_authenticated(self.request), HttpUnauthorized), True)
# No api_key.
self.request.GET['username'] = 'daniel'
self.assertEqual(isinstance(
self.auth.is_authenticated(self.request), HttpUnauthorized), True)
# Wrong user/api_key.
self.request.GET['username'] = 'daniel'
self.request.GET['api_key'] = 'foo'
self.assertEqual(isinstance(
self.auth.is_authenticated(self.request), HttpUnauthorized), True)
def test_is_authenticated(self):
"""Should correctly authenticate when using an existing user and key"""
# Correct user/api_key.
user = User.objects.get(username='apiuser')
self.request.GET['username'] = 'apiuser'
self.request.GET['api_key'] = user.api_key.key
self.assertEqual(self.auth.is_authenticated(self.request), True)
class UserTest(FixtureTestCase):
"""Test api user related stuff"""
def test_has_apikey(self):
"""User() should have an api key attr that was generated automatically."""
self.assertTrue(hasattr(self.api_user, 'api_key'))
def test_len_apikey(self):
"""Should have api user key with a non-zero length."""
self.assertTrue(len(self.api_user.api_key.key) >= 1)
def test_is_authenticated_header(self):
"""Taken from tastypie test suite to ensure api key is generated for
new users correctly and tastypie is installed correctly.
"""
auth = ApiKeyAuthentication()
request = HttpRequest()
# Simulate sending the signal.
john_doe = User.objects.get(username='johndoe')
# No username/api_key details should fail.
self.assertEqual(isinstance(auth.is_authenticated(request), HttpUnauthorized), True)
# Wrong username details.
request.META['HTTP_AUTHORIZATION'] = 'foo'
self.assertEqual(isinstance(auth.is_authenticated(request), HttpUnauthorized), True)
# No api_key.
request.META['HTTP_AUTHORIZATION'] = 'ApiKey daniel'
self.assertEqual(isinstance(auth.is_authenticated(request), HttpUnauthorized), True)
# Wrong user/api_key.
request.META['HTTP_AUTHORIZATION'] = 'ApiKey daniel:pass'
self.assertEqual(isinstance(auth.is_authenticated(request), HttpUnauthorized), True)
# Correct user/api_key.
john_doe = User.objects.get(username='johndoe')
request.META['HTTP_AUTHORIZATION'] = 'ApiKey johndoe:%s' % john_doe.api_key.key
self.assertEqual(auth.is_authenticated(request), True)
# Capitalization shouldn't matter.
john_doe = User.objects.get(username='johndoe')
request.META['HTTP_AUTHORIZATION'] = 'aPiKeY johndoe:%s' % john_doe.api_key.key
self.assertEqual(auth.is_authenticated(request), True)
def test_api_key(self):
"""User should be authenticated by it's api key."""
auth = ApiKeyAuthentication()
request = HttpRequest()
authorization = 'ApiKey %s:%s' % (self.api_user.username,
self.api_user.api_key.key)
request.META['HTTP_AUTHORIZATION'] = authorization
self.assertEqual(auth.is_authenticated(request), True)
class EnvironmentDjangoAuthorizationTestCase(FixtureTestCase):
"""Test Environment() API"""
def setUp(self):
super(EnvironmentDjangoAuthorizationTestCase, self).setUp()
self.add = Permission.objects.get_by_natural_key(
'add_environment', 'codespeed', 'environment')
self.change = Permission.objects.get_by_natural_key(
'change_environment', 'codespeed', 'environment')
self.delete = Permission.objects.get_by_natural_key(
'delete_environment', 'codespeed', 'environment')
self.env1_data = dict(
name="env1",
cpu="cpu1",
memory="48kB",
os="ZX Spectrum OS",
kernel="2.6.32"
)
self.env1 = Environment(**self.env1_data)
self.env1.save()
self.env2_data = dict(
name="env2",
cpu="z80",
memory="64kB",
os="ZX Spectrum OS",
kernel="2.6.32"
)
env_db1 = Environment.objects.get(id=1)
self.env_db1_data = dict(
[(k, getattr(env_db1, k)) for k in self.env1_data.keys()]
)
self.client = Client()
self.resource = EnvironmentResource()
self.auth = self.resource._meta.authorization
def test_no_perms(self):
"""User() should have only GET permission"""
# sanity check: user has no permissions
self.assertFalse(self.api_user.get_all_permissions())
request = HttpRequest()
request.method = 'GET'
request.user = self.api_user
# with no permissions, api is read-only
self.generic_test_authorized('GET', request, self.auth.read_list,
self.auth.read_detail)
methods = [('POST', self.auth.create_list, self.auth.create_detail),
('PUT', self.auth.update_list, self.auth.update_detail),
('DELETE', self.auth.delete_list, self.auth.delete_detail)]
for method, function_list, function_detail in methods:
self.generic_test_unauthorized(method, request, function_list,
function_detail)
def test_add_perm(self):
"""User() should have add permission granted."""
request = HttpRequest()
request.user = self.api_user
# give add permission
request.user.user_permissions.add(self.add)
self.generic_test_authorized('POST', request, self.auth.create_list,
self.auth.create_detail)
def test_change_perm(self):
"""User() should have change permission granted."""
request = HttpRequest()
request.user = self.api_user
# give change permission
request.user.user_permissions.add(self.change)
self.generic_test_authorized('PUT', request, self.auth.update_list,
self.auth.update_detail)
def test_delete_perm(self):
"""User() should have delete permission granted."""
request = HttpRequest()
request.user = self.api_user
# give delete permission
request.user.user_permissions.add(self.delete)
self.generic_test_authorized('DELETE', request, self.auth.delete_list,
self.auth.delete_detail)
def test_all(self):
"""User() should have add, change, delete permissions granted."""
request = HttpRequest()
request.user = self.api_user
request.user.user_permissions.add(self.add)
request.user.user_permissions.add(self.change)
request.user.user_permissions.add(self.delete)
methods = [('GET', self.auth.read_list, self.auth.read_detail),
('OPTIONS', self.auth.read_list, self.auth.read_detail),
('HEAD', self.auth.read_list, self.auth.read_detail),
('POST', self.auth.create_list, self.auth.create_detail),
('PUT', self.auth.update_list, self.auth.update_detail),
('PATCH', self.auth.read_list, self.auth.read_detail),
('PATCH', self.auth.update_list, self.auth.update_detail),
('PATCH', self.auth.delete_list, self.auth.delete_detail),
('DELETE', self.auth.delete_list, self.auth.delete_detail)]
for method, function_list, function_detail in methods:
self.generic_test_authorized(method, request, function_list,
function_detail)
def test_patch_perms(self):
"""User() should have patch (add, change, delete) permissions granted."""
request = HttpRequest()
request.user = self.api_user
request.method = 'PATCH'
# Not enough.
request.user.user_permissions.add(self.add)
self.generic_test_unauthorized('PATCH', request, self.auth.update_list,
self.auth.update_detail)
self.generic_test_unauthorized('PATCH', request, self.auth.delete_list,
self.auth.delete_detail)
# Still not enough.
request.user.user_permissions.add(self.change)
self.generic_test_unauthorized('PATCH', request, self.auth.delete_list,
self.auth.delete_detail)
# Much better.
request.user.user_permissions.add(self.delete)
# Nuke the perm cache. :/
del request.user._perm_cache
self.generic_test_authorized('PATCH', request, self.auth.delete_list,
self.auth.delete_detail)
def test_unrecognized_method(self):
"""User() should not have the permission to call non-existent method."""
request = HttpRequest()
self.api_user.user_permissions.clear()
request.user = self.api_user
# Check a non-existent HTTP method.
request.method = 'EXPLODE'
self.generic_test_unauthorized(
'EXPLODE', request, self.auth.update_list,
self.auth.update_detail)
def test_get_environment(self):
"""Should get an environment when given an existing ID"""
response = self.client.get(
'/api/v1/environment/1/?username={0}&api_key={1}'.format(
self.api_user.username, self.api_user.api_key.key))
self.assertEquals(response.status_code, 200)
self.assertEqual(json.loads(response.content)['name'], "Dual Core")
def test_get_non_existing_environment(self):
"""Should return 404 when given a non existing environment ID"""
response = self.client.get(
'/api/v1/environment/999/?username={0}&api_key={1}'.format(
self.api_user.username, self.api_user.api_key.key))
self.assertEquals(response.status_code, 404)
def test_get_environment_all_fields(self):
"""Should get all fields for an environment"""
response = self.client.get(
'/api/v1/environment/{0}/?username={1}&api_key={2}'.format(
self.env1.pk, self.api_user.username, self.api_user.api_key.key)
)
self.assertEquals(response.status_code, 200)
for k in self.env1_data.keys():
self.assertEqual(
json.loads(response.content)[k], getattr(self.env1, k))
def test_post(self):
"""Should save a new environment"""
request = HttpRequest()
request.user = self.api_user
request.user.user_permissions.add(self.add)
request.user.user_permissions.add(self.change)
request.user.user_permissions.add(self.delete)
response = self.client.post('/api/v1/environment/',
data=json.dumps(self.env2_data),
content_type='application/json')
self.assertEquals(response.status_code, 401)
response = self.client.post('/api/v1/environment/',
data=json.dumps(self.env2_data),
content_type='application/json',
**self.post_auth)
self.assertEquals(response.status_code, 201)
id = response['Location'].rsplit('/', 2)[-2]
#response = self.client.get('/api/v1/environment/{0}/'.format(id))
response = self.client.get(
'/api/v1/environment/{0}/?username={1}&api_key={2}'.format(
id, self.api_user.username, self.api_user.api_key.key)
)
for k, v in self.env2_data.items():
self.assertEqual(
json.loads(response.content)[k], v)
response = self.client.delete('/api/v1/environment/{0}/'.format(id),
content_type='application/json',
**self.post_auth)
self.assertEquals(response.status_code, 204)
def test_put(self):
"""Should modify an existing environment"""
modified_data = copy.deepcopy(self.env_db1_data)
modified_data['name'] = "env2.2"
modified_data['memory'] = "128kB"
response = self.client.put('/api/v1/environment/1/',
data=json.dumps(modified_data),
content_type='application/json',
**self.post_auth)
self.assertEquals(response.status_code, 401)
request = HttpRequest()
request.user = self.api_user
request.user.user_permissions.add(self.change)
response = self.client.put('/api/v1/environment/1/',
data=json.dumps(modified_data),
content_type='application/json',
**self.post_auth)
self.assertEquals(response.status_code, 204)
response = self.client.get('/api/v1/environment/1/')
response = self.client.get(
'/api/v1/environment/1/?username={0}&api_key={1}'.format(
self.api_user.username, self.api_user.api_key.key)
)
for k, v in modified_data.items():
self.assertEqual(
json.loads(response.content)[k], v)
def test_delete(self):
"""Should delete an environment"""
response = self.client.get(
'/api/v1/environment/1/?username={0}&api_key={1}'.format(
self.api_user.username, self.api_user.api_key.key))
self.assertEquals(response.status_code, 200)
# from fixture
response = self.client.delete('/api/v1/environment/1/',
content_type='application/json',
**self.post_auth)
self.assertEquals(response.status_code, 401)
request = HttpRequest()
request.user = self.api_user
request.user.user_permissions.add(self.delete)
response = self.client.delete('/api/v1/environment/1/',
content_type='application/json',
**self.post_auth)
self.assertEquals(response.status_code, 204)
response = self.client.get(
'/api/v1/environment/1/?username={0}&api_key={1}'.format(
self.api_user.username, self.api_user.api_key.key))
self.assertEquals(response.status_code, 404)
# from just created data
response = self.client.get(
'/api/v1/environment/{0}/?username={1}&api_key={2}'.format(
self.env1.pk, self.api_user.username, self.api_user.api_key.key)
)
self.assertEquals(response.status_code, 200)
response = self.client.delete(
'/api/v1/environment/{0}/'.format(self.env1.id),
content_type='application/json',
**self.post_auth
)
self.assertEquals(response.status_code, 204)
response = self.client.get(
'/api/v1/environment/{0}/?username={1}&api_key={2}'.format(
self.env1.pk, self.api_user.username, self.api_user.api_key.key)
)
self.assertEquals(response.status_code, 404)
class ProjectTest(FixtureTestCase):
"""Test Project() API"""
def setUp(self):
super(ProjectTest, self).setUp()
self.add = Permission.objects.get_by_natural_key(
'add_project', 'codespeed', 'project')
self.change = Permission.objects.get_by_natural_key(
'change_project', 'codespeed', 'project')
self.delete = Permission.objects.get_by_natural_key(
'delete_project', 'codespeed', 'project')
self.project_data = dict(
name="PyPy",
repo_type="M",
repo_path="ssh://hg@bitbucket.org/pypy/pypy",
repo_user="fridolin",
repo_pass="secret",
)
self.project_data2 = dict(
name="project alpha",
repo_type="M",
repo_path="ssh://hg@bitbucket.org/pypy/pypy",
repo_user="alpha",
repo_pass="beta",
)
self.project = Project(**self.project_data)
self.project.save()
self.client = Client()
self.resource = ProjectResource()
self.auth = self.resource._meta.authorization
def test_all(self):
"""User should have all permissions granted."""
request = HttpRequest()
request.user = self.api_user
request.user.user_permissions.add(self.add)
request.user.user_permissions.add(self.change)
request.user.user_permissions.add(self.delete)
methods = [('GET', self.auth.read_list, self.auth.read_detail),
('OPTIONS', self.auth.read_list, self.auth.read_detail),
('HEAD', self.auth.read_list, self.auth.read_detail),
('POST', self.auth.create_list, self.auth.create_detail),
('PUT', self.auth.update_list, self.auth.update_detail),
('PATCH', self.auth.read_list, self.auth.read_detail),
('PATCH', self.auth.update_list, self.auth.update_detail),
('PATCH', self.auth.delete_list, self.auth.delete_detail),
('DELETE', self.auth.delete_list, self.auth.delete_detail)]
for method, function_list, function_detail in methods:
self.generic_test_authorized(method, request, function_list,
function_detail)
def test_get_project(self):
"""Should get an existing project"""
response = self.client.get('/api/v1/project/{0}/'.format(
self.project.id,))
self.assertEquals(response.status_code, 200)
self.assertEqual(json.loads(response.content)['name'], "{0}".format(
self.project_data['name']))
def test_get_project_all_fields(self):
"""Should get all fields for a project"""
response = self.client.get('/api/v1/project/%s/' % (self.project.id,))
self.assertEquals(response.status_code, 200)
for k in self.project_data.keys():
self.assertEqual(
json.loads(response.content)[k], getattr(self.project, k))
def test_post(self):
"""Should save a new project"""
request = HttpRequest()
request.user = self.api_user
request.user.user_permissions.add(self.add)
request.user.user_permissions.add(self.change)
request.user.user_permissions.add(self.delete)
response = self.client.post('/api/v1/project/',
data=json.dumps(self.project_data2),
content_type='application/json')
self.assertEquals(response.status_code, 401)
response = self.client.post('/api/v1/project/',
data=json.dumps(self.project_data2),
content_type='application/json',
**self.post_auth)
self.assertEquals(response.status_code, 201)
id = response['Location'].rsplit('/', 2)[-2]
response = self.client.get(
'/api/v1/project/{0}/?username={1}&api_key={2}'.format(
self.project.id, self.api_user.username, self.api_user.api_key.key)
)
for k, v in self.project_data.items():
self.assertEqual(
json.loads(response.content)[k], v)
def test_delete(self):
"""Should delete an project"""
response = self.client.delete('/api/v1/project/{0}/'.format(
self.project.id,), content_type='application/json')
self.assertEquals(response.status_code, 401)
request = HttpRequest()
request.user = self.api_user
request.user.user_permissions.add(self.delete)
response = self.client.delete(
'/api/v1/project/{0}/'.format(self.project.id,),
content_type='application/json',
**self.post_auth
)
self.assertEquals(response.status_code, 204)
response = self.client.get('/api/v1/project/{0}/'.format(
self.project.id,)
)
self.assertEquals(response.status_code, 404)
class ExecutableTest(FixtureTestCase):
"""Test Executable() API"""
def setUp(self):
self.data = dict(name="Fibo", description="Fibonacci the Lame",)
# project is a ForeignKey and is not added automatically by tastypie
self.project = Project.objects.get(pk=1)
self.executable = Executable(project=self.project, **self.data)
self.executable.save()
self.client = Client()
super(ExecutableTest, self).setUp()
def test_get_executable(self):
"""Should get an existing executable"""
response = self.client.get('/api/v1/executable/{0}/'.format(
self.executable.id,))
self.assertEquals(response.status_code, 200)
self.assertEqual(json.loads(response.content)['name'], "{0}".format(
self.data['name']))
def test_get_executable_all_fields(self):
"""Should get all fields for an executable"""
response = self.client.get('/api/v1/executable/%s/' % (
self.executable.id,))
self.assertEquals(response.status_code, 200)
for k in self.data.keys():
self.assertEqual(
json.loads(response.content)[k], self.data[k])