# -*- coding: utf-8 -*-
# git+git://github.com/Sheeprider/BitBucket-api.git
__all__ = ['Bitbucket', ]
try:
from urlparse import parse_qs
except ImportError:
from urllib.parse import parse_qs
import json
import re
from requests import Request, Session
from requests_oauthlib import OAuth1
import requests
from .issue import Issue
from .repository import Repository
from .service import Service
from .ssh import SSH
from .deploy_key import DeployKey
# ========
# = URLs =
# ========
URLS = {
'BASE': 'https://bitbucket.org/!api/1.0/%s',
# Get user profile and repos
'GET_USER': 'users/%(username)s/',
'GET_USER_PRIVILEGES': 'user/privileges',
# Search repo
# 'SEARCH_REPO': 'repositories/?name=%(search)s',
# Get tags & branches
'GET_TAGS': 'repositories/%(username)s/%(repo_slug)s/tags/',
'GET_BRANCHES': 'repositories/%(username)s/%(repo_slug)s/branches/',
'REQUEST_TOKEN': 'oauth/request_token/',
'AUTHENTICATE': 'oauth/authenticate?oauth_token=%(token)s',
'ACCESS_TOKEN': 'oauth/access_token/'
}
[docs]class Bitbucket(object):
""" This class lets you interact with the bitbucket public API. """
def __init__(self, username='', password='', repo_name_or_slug=''):
self.username = username
self.password = password
self.repo_slug = repo_name_or_slug
self.repo_tree = {}
self.URLS = URLS
self.repository = Repository(self)
self.service = Service(self)
self.ssh = SSH(self)
self.issue = Issue(self)
self.deploy_key = DeployKey(self)
self.access_token = None
self.access_token_secret = None
self.consumer_key = None
self.consumer_secret = None
self.oauth = None
# ===================
# = Getters/Setters =
# ===================
@property
def auth(self):
""" Return credentials for current Bitbucket user. """
if self.oauth:
return self.oauth
return (self.username, self.password)
@property
def username(self):
"""Return your repository's username."""
return self._username
@username.setter
def username(self, value):
try:
if isinstance(value, basestring):
self._username = unicode(value)
except NameError:
self._username = value
if value is None:
self._username = None
@username.deleter
def username(self):
del self._username
@property
def password(self):
"""Return your repository's password."""
return self._password
@password.setter
def password(self, value):
try:
if isinstance(value, basestring):
self._password = unicode(value)
except NameError:
self._password = value
if value is None:
self._password = None
@password.deleter
def password(self):
del self._password
@property
def repo_slug(self):
"""Return your repository's slug name."""
return self._repo_slug
@repo_slug.setter
def repo_slug(self, value):
if value is None:
self._repo_slug = None
else:
try:
if isinstance(value, basestring):
value = unicode(value)
except NameError:
pass
value = value.lower()
self._repo_slug = re.sub(r'[^a-z0-9_-]+', '-', value)
@repo_slug.deleter
def repo_slug(self):
del self._repo_slug
# ========================
# = Oauth authentication =
# ========================
[docs] def authorize(self, consumer_key, consumer_secret, callback_url=None,
access_token=None, access_token_secret=None):
"""
Call this with your consumer key, secret and callback URL, to
generate a token for verification.
"""
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
if not access_token and not access_token_secret:
if not callback_url:
return (False, "Callback URL required")
oauth = OAuth1(
consumer_key,
client_secret=consumer_secret,
callback_uri=callback_url)
r = requests.post(self.url('REQUEST_TOKEN'), auth=oauth)
if r.status_code == 200:
creds = parse_qs(r.content)
self.access_token = creds.get('oauth_token')[0]
self.access_token_secret = creds.get('oauth_token_secret')[0]
else:
return (False, r.content)
else:
self.finalize_oauth(access_token, access_token_secret)
return (True, None)
[docs] def verify(self, verifier, consumer_key=None, consumer_secret=None,
access_token=None, access_token_secret=None):
"""
After converting the token into verifier, call this to finalize the
authorization.
"""
# Stored values can be supplied to verify
self.consumer_key = consumer_key or self.consumer_key
self.consumer_secret = consumer_secret or self.consumer_secret
self.access_token = access_token or self.access_token
self.access_token_secret = access_token_secret or self.access_token_secret
oauth = OAuth1(
self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.access_token,
resource_owner_secret=self.access_token_secret,
verifier=verifier)
r = requests.post(self.url('ACCESS_TOKEN'), auth=oauth)
if r.status_code == 200:
creds = parse_qs(r.content)
else:
return (False, r.content)
self.finalize_oauth(creds.get('oauth_token')[0],
creds.get('oauth_token_secret')[0])
return (True, None)
[docs] def finalize_oauth(self, access_token, access_token_secret):
""" Called internally once auth process is complete. """
self.access_token = access_token
self.access_token_secret = access_token_secret
# Final OAuth object
self.oauth = OAuth1(
self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.access_token,
resource_owner_secret=self.access_token_secret)
# ======================
# = High lvl functions =
# ======================
[docs] def dispatch(self, method, url, auth=None, params=None, **kwargs):
""" Send HTTP request, with given method,
credentials and data to the given URL,
and return the success and the result on success.
"""
r = Request(
method=method,
url=url,
auth=auth,
params=params,
data=kwargs)
s = Session()
resp = s.send(r.prepare())
status = resp.status_code
text = resp.text
error = resp.reason
if status >= 200 and status < 300:
if text:
try:
return (True, json.loads(text))
except TypeError:
pass
except ValueError:
pass
return (True, text)
elif status >= 300 and status < 400:
return (
False,
'Unauthorized access, '
'please check your credentials.')
elif status >= 400 and status < 500:
return (False, 'Service not found.')
elif status >= 500 and status < 600:
return (False, 'Server error.')
else:
return (False, error)
[docs] def url(self, action, **kwargs):
""" Construct and return the URL for a specific API service. """
# TODO : should be static method ?
return self.URLS['BASE'] % self.URLS[action] % kwargs
# =====================
# = General functions =
# =====================
[docs] def get_user(self, username=None):
""" Returns user informations.
If username is not defined, tries to return own informations.
"""
username = username or self.username or ''
url = self.url('GET_USER', username=username)
response = self.dispatch('GET', url)
try:
return (response[0], response[1]['user'])
except TypeError:
pass
return response
[docs] def get_branches(self, repo_slug=None):
""" Get a single repository on Bitbucket and return its branches."""
repo_slug = repo_slug or self.repo_slug or ''
url = self.url('GET_BRANCHES',
username=self.username,
repo_slug=repo_slug)
return self.dispatch('GET', url, auth=self.auth)
[docs] def get_privileges(self):
""" Get privledges for this user. """
url = self.url('GET_USER_PRIVILEGES')
return self.dispatch('GET', url, auth=self.auth)