Skip to content

Attach response controls to exceptions #251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Doc/reference/ldap.rst
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ The module defines the following exceptions:
is set to a truncated form of the name provided or alias dereferenced
for the lowest entry (object or alias) that was matched.

The :py:const:`ctrls` field can be included to the dictionary, which is a list of response controls.

.. py:exception:: ADMINLIMIT_EXCEEDED

Expand Down
1 change: 1 addition & 0 deletions Doc/reference/slapdtest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ Classes

.. autoclass:: slapdtest.SlapdTestCase
:members:

30 changes: 30 additions & 0 deletions Lib/slapdtest/_slapdtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

# a template string for generating simple slapd.conf file
SLAPD_CONF_TEMPLATE = r"""

serverID %(serverid)s
%(moduleload_directives)s
moduleload back_%(database)s
%(include_directives)s
loglevel %(loglevel)s
Expand All @@ -43,6 +45,8 @@
rootdn "%(rootdn)s"
rootpw "%(rootpw)s"

%(overlay_configurations)s

TLSCACertificateFile "%(cafile)s"
TLSCertificateFile "%(servercert)s"
TLSCertificateKeyFile "%(serverkey)s"
Expand Down Expand Up @@ -187,6 +191,19 @@ class SlapdObject(object):
'core.schema',
)

#: List (or tuple) of OpenLDAP module names you want to activate.
#: Default is empty.
modules = ()

#: List (or tuple) of OpenLDAP overlay settings you want to include.
#: Default is empty.
#: Each element is a dict of the form of::
#:
#: {"name": overlay_name,
#: "configuration": configuration_text}
#:
overlays = ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these attributes can be overridden in subclasses, they should be documented.


TMPDIR = os.environ.get('TMP', os.getcwd())
if 'SCHEMA' in os.environ:
SCHEMADIR = os.environ['SCHEMA']
Expand Down Expand Up @@ -331,9 +348,22 @@ def gen_config(self):
)
for schema_file in self.openldap_schema_files
)

moduleload_directives = '\n'.join(
"moduleload {module}".format(module=module)
for module in self.modules
)

overlay_configurations = '\n'.join(
"overlay {name}\n{configuration}".format(**overlay)
for overlay in self.overlays
)

config_dict = {
'serverid': hex(self.server_id),
'schema_prefix':self._schema_prefix,
'moduleload_directives': moduleload_directives,
'overlay_configurations': overlay_configurations,
'include_directives': include_directives,
'loglevel': self.slapd_loglevel,
'database': self.database,
Expand Down
30 changes: 17 additions & 13 deletions Modules/LDAPObject.c
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,20 @@ l_ldap_result4(LDAPObject *self, PyObject *args)
LDAP_END_ALLOW_THREADS(self);
}

if (!(pyctrls = LDAPControls_to_List(serverctrls))) {
int err = LDAP_NO_MEMORY;

LDAP_BEGIN_ALLOW_THREADS(self);
ldap_set_option(self->ldap, LDAP_OPT_ERROR_NUMBER, &err);
LDAP_END_ALLOW_THREADS(self);
ldap_msgfree(msg);
Py_XDECREF(valuestr);
return LDAPerror(self->ldap, "LDAPControls_to_List");
}
ldap_controls_free(serverctrls);

/* Always call Py_XDECREF(pyctrls) before returning after here */

if (result != LDAP_SUCCESS) { /* result error */
char *e, err[1024];

Expand All @@ -1173,21 +1187,11 @@ l_ldap_result4(LDAPObject *self, PyObject *args)
e = "ldap_parse_result";
ldap_msgfree(msg);
Py_XDECREF(valuestr);
return LDAPerror(self->ldap, e);
retval = LDAPraise_exception(self->ldap, e, pyctrls);
Py_XDECREF(pyctrls);
return retval;
}

if (!(pyctrls = LDAPControls_to_List(serverctrls))) {
int err = LDAP_NO_MEMORY;

LDAP_BEGIN_ALLOW_THREADS(self);
ldap_set_option(self->ldap, LDAP_OPT_ERROR_NUMBER, &err);
LDAP_END_ALLOW_THREADS(self);
ldap_msgfree(msg);
Py_XDECREF(valuestr);
return LDAPerror(self->ldap, "LDAPControls_to_List");
}
ldap_controls_free(serverctrls);

pmsg =
LDAPmessage_to_python(self->ldap, msg, add_ctrls, add_intermediates);

Expand Down
17 changes: 16 additions & 1 deletion Modules/constants.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ LDAPerr(int errnum)

/* Convert an LDAP error into an informative python exception */
PyObject *
LDAPerror(LDAP *l, char *msg)
LDAPraise_exception(LDAP *l, char *msg, PyObject *pyctrls)
{
if (l == NULL) {
PyErr_SetFromErrno(LDAPexception_class);
Expand Down Expand Up @@ -104,6 +104,10 @@ LDAPerror(LDAP *l, char *msg)
ldap_memfree(matched);
}

if (pyctrls != NULL) {
PyDict_SetItemString(info, "ctrls", pyctrls);
}

if (errnum == LDAP_REFERRAL) {
str = PyUnicode_FromString(msg);
if (str)
Expand All @@ -125,6 +129,17 @@ LDAPerror(LDAP *l, char *msg)
}
}


/* Convert an LDAP error into an informative python exception.
This is the convenient function for the case where the exception
doesn't have to include any response controls. */
PyObject *
LDAPerror(LDAP *l, char *msg)
{
return LDAPraise_exception(l, msg, NULL);
}


/* initialise the module constants */

int
Expand Down
1 change: 1 addition & 0 deletions Modules/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ extern PyObject *LDAPconstant(int);

extern PyObject *LDAPexception_class;
extern PyObject *LDAPerror(LDAP *, char *msg);
extern PyObject *LDAPraise_exception(LDAP *, char *msg, PyObject *pyctrls);
PyObject *LDAPerr(int errnum);

#ifndef LDAP_CONTROL_PAGE_OID
Expand Down
6 changes: 6 additions & 0 deletions Tests/t_ldap_controls_ppolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

PP_GRACEAUTH = b'0\x84\x00\x00\x00\t\xa0\x84\x00\x00\x00\x03\x81\x01\x02'
PP_TIMEBEFORE = b'0\x84\x00\x00\x00\t\xa0\x84\x00\x00\x00\x03\x80\x012'
PP_ACCOUNT_LOCKOUT = b'0\x03\x81\x01\x01'


class TestControlsPPolicy(unittest.TestCase):
Expand All @@ -28,6 +29,11 @@ def test_ppolicy_timebefore(self):
pp.decodeControlValue(PP_TIMEBEFORE)
self.assertPPolicy(pp, timeBeforeExpiration=50)

def test_ppolicy_account_lockout(self):
pp = ppolicy.PasswordPolicyControl()
pp.decodeControlValue(PP_ACCOUNT_LOCKOUT)
self.assertPPolicy(pp, error=1)


if __name__ == '__main__':
unittest.main()
109 changes: 107 additions & 2 deletions Tests/t_ldapobject.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@
os.environ['LDAPNOINIT'] = '1'

import ldap
import ldap.controls
import ldap.controls.ppolicy
from ldap.ldapobject import SimpleLDAPObject, ReconnectLDAPObject

from slapdtest import SlapdTestCase
from slapdtest import SlapdTestCase, SlapdObject
from slapdtest import requires_ldapi, requires_sasl, requires_tls


Expand Down Expand Up @@ -75,6 +76,110 @@
"""


class PPolicyEnabledSlapdObject(SlapdObject):
"""
A subclass of :py:class:`SlapdObject` with password policy enabled.
Note that this class has no actual password policy configuration entries.
It is the job of the users of this class to define
the default password policies on their own.
The dn of the default is :attr:`.default_ppolicy_dn` of this class.
"""

openldap_schema_files = (
'core.schema', 'ppolicy.schema'
)
modules = (
'ppolicy',
)

default_ppolicy_dn = "cn=default-ppolicy,%(suffix)s" % {
'suffix': SlapdObject.suffix
}

overlays = (
{
'name': 'ppolicy',
'configuration': "\n".join([
'ppolicy_default "{}"'.format(default_ppolicy_dn),
# let slapd tell the clients that they are locked out
'ppolicy_use_lockout'])
},
)


class Test02_ResponseControl(SlapdTestCase):
"""
tests abount response controls sent by the server
"""

ldap_object_class = SimpleLDAPObject
server_class = PPolicyEnabledSlapdObject

@classmethod
def setUpClass(cls):
super(Test02_ResponseControl, cls).setUpClass()
# insert some Foo* objects via ldapadd
cls.server.ldapadd(
LDIF_TEMPLATE % {
'suffix': cls.server.suffix,
'rootdn': cls.server.root_dn,
'rootcn': cls.server.root_cn,
'rootpw': cls.server.root_pw,
'dc': cls.server.suffix.split(',')[0][3:],
}
)

# Very strict pwdMaxFailure in order to easily test the cases where
# bind failure with response controls is needed
cls.server.ldapadd(
'''dn: {dn}
objectClass: organizationalRole
objectClass: pwdPolicy
cn: default-ppolicy
pwdAttribute: userPassword
pwdLockout: TRUE
pwdMaxFailure: 1
pwdLockoutDuration: 60
pwdFailureCountInterval: 3600'''.format(dn=cls.server.default_ppolicy_dn)
)

def test_response_controls_are_attached_to_exceptions(self):
base = self.server.suffix
cn = "test_response_controls_are_attached_to_exceptions"
user_dn = "cn={},{}".format(cn, base)
password = "user5_pw"

self.server.ldapadd(
'''dn: {dn}
objectClass: applicationProcess
objectClass: simpleSecurityObject
cn: {cn}
userPassword: {password}'''.format(cn=cn, dn=user_dn, password=password)
)

ldap_conn = self.ldap_object_class(self.server.ldap_uri)

# Firstly cause a bind failure to lock out the account
with self.assertRaises(ldap.INVALID_CREDENTIALS) as cm:
wrong_password = 'wrong' + password
ldap_conn.simple_bind_s(user_dn, wrong_password)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also check here that ctrls is empty?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked.
Thank you for quick and detailed review.


empty_controls = cm.exception.args[0]['ctrls']
self.assertEqual(len(empty_controls), 0)

with self.assertRaises(ldap.INVALID_CREDENTIALS) as cm:
ldap_conn.simple_bind_s(
user_dn, password,
serverctrls=[ldap.controls.ppolicy.PasswordPolicyControl()])

controls = cm.exception.args[0]['ctrls']
decoded_controls = ldap.controls.DecodeControlTuples(controls)
self.assertEqual(len(decoded_controls), 1)
pp = decoded_controls[0]
expected_error = ldap.controls.ppolicy.PasswordPolicyError('accountLocked')
self.assertEqual(pp.error, int(expected_error))


class Test00_SimpleLDAPObject(SlapdTestCase):
"""
test LDAP search operations
Expand Down