1
0
mirror of https://github.com/kakwa/ldapcherry synced 2024-05-29 07:08:04 +02:00
ldapcherry/ldapcherry/backend/backendLdap.py
2015-07-11 22:25:21 +02:00

420 lines
14 KiB
Python

# -*- coding: utf-8 -*-
# vim:set expandtab tabstop=4 shiftwidth=4:
#
# The MIT License (MIT)
# LdapCherry
# Copyright (c) 2014 Carpentier Pierre-Francois
import cherrypy
import ldap
import ldap.modlist as modlist
import logging
import ldapcherry.backend
import os
import re
class DelUserDontExists(Exception):
def __init__(self, user):
self.user = user
self.log = "cannot remove user, user <%(user)s> does not exist" % \
{'user': user}
class CaFileDontExist(Exception):
def __init__(self, cafile):
self.cafile = cafile
self.log = "CA file %(cafile)s don't exist" % {'cafile': cafile}
NO_ATTR = 0
DISPLAYED_ATTRS = 1
LISTED_ATTRS = 2
ALL_ATTRS = 3
class Backend(ldapcherry.backend.Backend):
def __init__(self, config, logger, name, attrslist, key):
self.config = config
self._logger = logger
self.backend_name = name
self.binddn = self.get_param('binddn')
self.bindpassword = self.get_param('password')
self.ca = self.get_param('ca', False)
self.checkcert = self.get_param('checkcert', 'on')
self.starttls = self.get_param('starttls', 'off')
self.uri = self.get_param('uri')
self.timeout = self.get_param('timeout', 1)
self.userdn = self.get_param('userdn')
self.groupdn = self.get_param('groupdn')
self.user_filter_tmpl = self.get_param('user_filter_tmpl')
self.group_filter_tmpl = self.get_param('group_filter_tmpl')
self.search_filter_tmpl = self.get_param('search_filter_tmpl')
self.dn_user_attr = self.get_param('dn_user_attr')
self.objectclasses = []
self.key = key
for o in re.split('\W+', self.get_param('objectclasses')):
self.objectclasses.append(self._str(o))
self.group_attrs = {}
for param in config:
name, sep, group = param.partition('.')
if name == 'group_attr':
self.group_attrs[group] = self.get_param(param)
self.attrlist = []
for a in attrslist:
self.attrlist.append(self._str(a))
def _exception_handler(self, e):
et = type(e)
if et is ldap.OPERATIONS_ERROR:
self._logger(
severity=logging.ERROR,
msg="cannot use starttls with ldaps://"
" uri (uri: " + self.uri + ")",
)
elif et is ldap.INVALID_CREDENTIALS:
self._logger(
severity=logging.ERROR,
msg="Configuration error, wrong credentials,"
" unable to connect to ldap with '" + self.binddn + "'",
)
elif et is ldap.SERVER_DOWN:
self._logger(
severity=logging.ERROR,
msg="Unable to contact ldap server '" +
self.uri +
"', check 'auth.ldap.uri'"
" and ssl/tls configuration",
)
elif et is ldap.FILTER_ERROR:
self._logger(
severity=logging.ERROR,
msg="Bad search filter, check '" +
self.backend_name +
".*_filter_tmpl' params",
)
elif et is ldap.NO_SUCH_OBJECT:
self._logger(
severity=logging.ERROR,
msg="Search DN '" + basedn +
"' doesn't exist, check '" +
self.backend_name +
".userdn' or '" +
self.backend_name +
".groupdn'",
)
elif et is ldap.OBJECT_CLASS_VIOLATION:
info = e[0]['info']
desc = e[0]['desc']
self._logger(
severity=logging.ERROR,
msg="Configuration error, " + desc + ", " + info,
)
elif et is ldap.INSUFFICIENT_ACCESS:
self._logger(
severity=logging.ERROR,
msg="Access error on '" +
self.backend_name +
"' backend, please check your acls in this backend",
)
elif et is ldap.ALREADY_EXISTS:
desc = e[0]['desc']
self._logger(
severity=logging.ERROR,
msg="adding user failed, " + desc,
)
else:
self._logger(
severity=logging.ERROR,
msg="unknow ldap exception in ldap backend",
)
raise e
def _connect(self):
ldap_client = ldap.initialize(self.uri)
ldap.set_option(ldap.OPT_REFERRALS, 0)
ldap.set_option(ldap.OPT_TIMEOUT, self.timeout)
if self.starttls == 'on':
ldap.set_option(ldap.OPT_X_TLS_DEMAND, True)
else:
ldap.set_option(ldap.OPT_X_TLS_DEMAND, False)
if self.ca and self.checkcert == 'on':
if os.path.isfile(self.ca):
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ca)
else:
raise CaFileDontExist(self.ca)
if self.checkcert == 'off':
# this is dark magic
# remove any of these two lines and it doesn't work
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
ldap_client.set_option(
ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER
)
else:
# this is even darker magic
ldap_client.set_option(
ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_DEMAND
)
# it doesn't make sense to set it to never
# (== don't check certifate)
# but it only works with this option...
# ... and it checks the certificat
# (I've lost my sanity over this)
ldap.set_option(
ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER
)
if self.starttls == 'on':
try:
ldap_client.start_tls_s()
except Exception as e:
self._exception_handler(e)
return ldap_client
def _bind(self):
ldap_client = self._connect()
try:
ldap_client.simple_bind_s(self.binddn, self.bindpassword)
except Exception as e:
ldap_client.unbind_s()
self._exception_handler(e)
return ldap_client
def _search(self, searchfilter, attrs, basedn):
if attrs == NO_ATTR:
attrlist = []
elif attrs == DISPLAYED_ATTRS:
# fix me later (to much attributes)
attrlist = self.attrlist
elif attrs == LISTED_ATTRS:
attrlist = self.attrlist
elif attrs == ALL_ATTRS:
attrlist = None
else:
attrlist = None
ldap_client = self._bind()
try:
r = ldap_client.search_s(
basedn,
ldap.SCOPE_SUBTREE,
searchfilter,
attrlist=attrlist
)
except Exception as e:
ldap_client.unbind_s()
self._exception_handler(e)
ldap_client.unbind_s()
return r
def _get_user(self, username, attrs=ALL_ATTRS):
user_filter = self.user_filter_tmpl % {
'username': username
}
r = self._search(user_filter, attrs, self.userdn)
if len(r) == 0:
return None
if attrs == NO_ATTR:
dn_entry = r[0][0]
else:
dn_entry = r[0]
return dn_entry
def _str(self, s):
return s.encode('utf-8')
def _uni(self, s):
return s.decode('utf-8')
def auth(self, username, password):
binddn = self._get_user(username, NO_ATTR)
if binddn is not None:
ldap_client = self._connect()
try:
ldap_client.simple_bind_s(binddn, password)
except ldap.INVALID_CREDENTIALS:
ldap_client.unbind_s()
return False
ldap_client.unbind_s()
return True
else:
return False
def add_user(self, attrs):
ldap_client = self._bind()
attrs_str = {}
for a in attrs:
attrs_str[self._str(a)] = self._str(attrs[a])
attrs_str['objectClass'] = self.objectclasses
dn = \
self.dn_user_attr +\
'=' +\
attrs[self.dn_user_attr] +\
',' +\
self.userdn
ldif = modlist.addModlist(attrs_str)
try:
ldap_client.add_s(dn, ldif)
except Exception as e:
ldap_client.unbind_s()
self._exception_handler(e)
ldap_client.unbind_s()
def del_user(self, username):
ldap_client = self._bind()
dn = self._get_user(username, NO_ATTR)
if dn is not None:
ldap_client.delete_s(dn)
else:
raise DelUserDontExists(username)
ldap_client.unbind_s()
def set_attrs(self, username, attrs):
ldap_client = self._bind()
tmp = self._get_user(username, ALL_ATTRS)
dn = tmp[0]
old_attrs = tmp[1]
for attr in attrs:
content = self._str(attrs[attr])
attr = self._str(attr)
new = {attr: content}
if attr in old_attrs:
old = {attr: old_attrs[attr]}
else:
old = {}
ldif = modlist.modifyModlist(old, new)
try:
ldap_client.modify_s(dn, ldif)
except Exception as e:
ldap_client.unbind_s()
self._exception_handler(e)
ldap_client.unbind_s()
def add_to_groups(self, username, groups):
ldap_client = self._bind()
tmp = self._get_user(username, ALL_ATTRS)
dn = tmp[0]
attrs = tmp[1]
attrs['dn'] = dn
for group in groups:
group = self._str(group)
for attr in self.group_attrs:
content = self._str(self.group_attrs[attr] % attrs)
self._logger(
severity=logging.DEBUG,
msg="%(backend)s: adding user '%(user)s'"
" with dn '%(dn)s' to group '%(group)s' by"
" setting '%(attr)s' to '%(content)s'" % {
'user': username,
'dn': dn,
'group': group,
'attr': attr,
'content': content,
'backend': self.backend_name
}
)
ldif = modlist.modifyModlist({}, {attr: content})
try:
ldap_client.modify_s(group, ldif)
except ldap.TYPE_OR_VALUE_EXISTS as e:
self._logger(
severity=logging.INFO,
msg="%(backend)s: user '%(user)s'"
" already member of group '%(group)s'"
"(attribute '%(attr)s')" % {
'user': username,
'group': group,
'attr': attr,
'backend': self.backend_name
}
)
except Exception as e:
ldap_client.unbind_s()
self._exception_handler(e)
ldap_client.unbind_s()
def del_from_groups(self, username, groups):
ldap_client = self._bind()
tmp = self._get_user(username, ALL_ATTRS)
dn = tmp[0]
attrs = tmp[1]
attrs['dn'] = dn
for group in groups:
group = self._str(group)
for attr in self.group_attrs:
content = self._str(self.group_attrs[attr] % attrs)
ldif = [(ldap.MOD_DELETE, attr, content)]
try:
ldap_client.modify_s(group, ldif)
except ldap.NO_SUCH_ATTRIBUTE as e:
self._logger(
severity=logging.INFO,
msg="%(backend)s: user '%(user)s'"
" wasn't member of group '%(group)s'"
" (attribute '%(attr)s')" % {
'user': username,
'group': group,
'attr': attr,
'backend': self.backend_name
}
)
except Exception as e:
ldap_client.unbind_s()
self._exception_handler(e)
ldap_client.unbind_s()
def search(self, searchstring):
ret = {}
searchfilter = self.search_filter_tmpl % {
'searchstring': searchstring
}
for u in self._search(searchfilter, DISPLAYED_ATTRS, self.userdn):
attrs = {}
attrs_tmp = u[1]
for attr in attrs_tmp:
value_tmp = attrs_tmp[attr]
if len(value_tmp) == 1:
attrs[attr] = self._uni(value_tmp[0])
else:
attrs[attr] = map(self._uni, value_tmp)
if self.key in attrs:
ret[attrs[self.key]] = attrs
return ret
def get_user(self, username):
ret = {}
attrs_tmp = self._get_user(username, ALL_ATTRS)[1]
for attr in attrs_tmp:
value_tmp = attrs_tmp[attr]
if len(value_tmp) == 1:
ret[attr] = self._uni(value_tmp[0])
else:
ret[attr] = map(self._uni, value_tmp)
return ret
def get_groups(self, username):
userdn = self._get_user(username, NO_ATTR)
searchfilter = self.group_filter_tmpl % {
'userdn': userdn,
'username': username
}
groups = self._search(searchfilter, NO_ATTR, self.groupdn)
ret = []
for entry in groups:
ret.append(self._uni(entry[0]))
return ret