ldapcherry/ldapcherry/backend/backendAD.py

189 lines
5.7 KiB
Python

# -*- coding: utf-8 -*-
# vim:set expandtab tabstop=4 shiftwidth=4:
#
# License MIT
# LdapCherry
# Copyright (c) 2014 Carpentier Pierre-Francois
import ldapcherry.backend.backendLdap
import cherrypy
import ldap
import ldap.modlist as modlist
import ldap.filter
import logging
import ldapcherry.backend
from ldapcherry.exceptions import UserDoesntExist, GroupDoesntExist
import os
import re
class CaFileDontExist(Exception):
def __init__(self, cafile):
self.cafile = cafile
self.log = "CA file %(cafile)s don't exist" % {'cafile': cafile}
NO_ATTR = 0
DISPLAYED_ATTRS = 1
LISTED_ATTRS = 2
ALL_ATTRS = 3
# Generated by the followin command:
# samba-tool group list | \
# while read line; \
# do
# ldapsearch -x -h localhost -D "administrator@dc.ldapcherry.org" \
# -w qwertyP455 -b "dc=dc,dc=ldapcherry,dc=org" "(cn=$line)" dn; \
# done | grep -e "dn: .*CN=Builtin" | \
# sed "s/dn: CN=\(.*\),CN=.*/'\1',/"
AD_BUILTIN_GROUPS = [
'Pre-Windows 2000 Compatible Access',
'Windows Authorization Access Group',
'Certificate Service DCOM Access',
'Network Configuration Operators',
'Terminal Server License Servers',
'Incoming Forest Trust Builders',
'Performance Monitor Users',
'Cryptographic Operators',
'Distributed COM Users',
'Performance Log Users',
'Remote Desktop Users',
'Account Operators',
'Event Log Readers',
'Backup Operators',
'Server Operators',
'Print Operators',
'Administrators',
'Replicator',
'IIS_IUSRS',
'Guests',
'Users',
]
class Backend(ldapcherry.backend.backendLdap.Backend):
def __init__(self, config, logger, name, attrslist, key):
self.config = config
self._logger = logger
self.backend_name = name
self.backend_display_name = self.get_param('display_name')
self.domain = self.get_param('domain')
self.login = self.get_param('login')
basedn = 'dc=' + re.sub(r'\.', ',DC=', self.domain)
self.binddn = self.get_param('login') + '@' + self.domain
self.bindpassword = self.get_param('password')
self.ca = self.get_param('ca', False)
self.checkcert = self.get_param('checkcert', 'on')
self.starttls = self.get_param('starttls', 'off')
self.uri = self.get_param('uri')
self.timeout = self.get_param('timeout', 1)
self.userdn = 'CN=Users,' + basedn
self.groupdn = self.userdn
self.builtin = 'CN=Builtin,' + basedn
self.user_filter_tmpl = '(sAMAccountName=%(username)s)'
self.group_filter_tmpl = '(member=%(userdn)s)'
self.search_filter_tmpl = '(&(|(sAMAccountName=%(searchstring)s)' \
'(cn=%(searchstring)s*)' \
'(name=%(searchstring)s*)' \
'(sn=%(searchstring)s*)' \
'(givenName=%(searchstring)s*)' \
'(cn=%(searchstring)s*))' \
'(&(objectClass=person)' \
'(objectClass=user)' \
'(!(objectClass=computer)))' \
')'
self.dn_user_attr = 'cn'
self.key = 'sAMAccountName'
self.objectclasses = [
'top',
'person',
'organizationalPerson',
'user',
'posixAccount',
]
self.group_attrs = {
'member': "%(dn)s"
}
self.attrlist = []
for a in attrslist:
self.attrlist.append(self._str(a))
def _search_group(self, searchfilter, groupdn):
searchfilter = self._str(searchfilter)
ldap_client = self._bind()
try:
r = ldap_client.search_s(
groupdn,
ldap.SCOPE_SUBTREE,
searchfilter,
attrlist=['CN']
)
except Exception as e:
ldap_client.unbind_s()
self._exception_handler(e)
ldap_client.unbind_s()
return r
def _build_groupdn(self, groups):
ad_groups = []
for group in groups:
if group in AD_BUILTIN_GROUPS:
ad_groups.append('cn=' + group + ',' + self.builtin)
else:
ad_groups.append('cn=' + group + ',' + self.groupdn)
return ad_groups
def add_to_groups(self, username, groups):
ad_groups = self._build_groupdn(groups)
super(Backend, self).add_to_groups(username, ad_groups)
def del_from_groups(self, username, groups):
ad_groups = self._build_groupdn(groups)
super(Backend, self).del_from_groups(username, ad_groups)
def get_groups(self, username):
username = ldap.filter.escape_filter_chars(username)
userdn = self._get_user(username, NO_ATTR)
searchfilter = self.group_filter_tmpl % {
'userdn': userdn,
'username': username
}
groups = self._search_group(searchfilter, self.groupdn)
groups = groups + self._search_group(searchfilter, self.builtin)
ret = []
self._logger(
severity=logging.DEBUG,
msg="%(backend)s: groups of '%(user)s' are %(groups)s" % {
'user': username,
'groups': str(groups),
'backend': self.backend_name
}
)
for entry in groups:
ret.append(entry[1]['cn'][0])
return ret
def auth(self, username, password):
binddn = username + '@' + self.domain
if binddn is not None:
ldap_client = self._connect()
try:
ldap_client.simple_bind_s(binddn, password)
except ldap.INVALID_CREDENTIALS:
ldap_client.unbind_s()
return False
ldap_client.unbind_s()
return True
else:
return False