Port LDAP wrapper to NPTH.

* agent/gpg-agent.c (handle_connections): Handle error.
* dirmngr/dirmngr_ldap.c, dirmngr/ldap-wrapper-ce.c: Port to NPTH.
This commit is contained in:
Marcus Brinkmann 2012-01-24 17:37:01 +01:00 committed by Werner Koch
parent ccbb4c3652
commit 4074f96627
3 changed files with 182 additions and 72 deletions

View File

@ -1866,7 +1866,9 @@ handle_connections (gnupg_fd_t listen_fd, gnupg_fd_t listen_fd_ssh)
#endif
ret = npth_attr_init(&tattr);
/* FIXME: Check error. */
if (ret)
log_fatal ("error allocating thread attributes: %s\n",
gpg_strerror (ret));
npth_attr_setdetachstate (&tattr, NPTH_CREATE_DETACHED);
#ifndef HAVE_W32_SYSTEM

View File

@ -58,13 +58,13 @@
#include "i18n.h"
#include "util.h"
/* With the ldap wrapper, there is no need for the pth_enter and leave
/* With the ldap wrapper, there is no need for the npth_unprotect and leave
functions; thus we redefine them to nops. If we are not using the
ldap wrapper process we need to include the prototype for our
module's main function. */
#ifdef USE_LDAPWRAPPER
static void pth_enter (void) { }
static void pth_leave (void) { }
static void npth_unprotect (void) { }
static void npth_protect (void) { }
#else
# include "./ldap-wrapper.h"
#endif
@ -392,9 +392,9 @@ print_ldap_entries (my_opt_t myopt, LDAP *ld, LDAPMessage *msg, char *want_attr)
LDAPMessage *item;
int any = 0;
for (pth_enter (), item = ldap_first_entry (ld, msg), pth_leave ();
for (npth_unprotect (), item = ldap_first_entry (ld, msg), npth_protect ();
item;
pth_enter (), item = ldap_next_entry (ld, item), pth_leave ())
npth_unprotect (), item = ldap_next_entry (ld, item), npth_protect ())
{
BerElement *berctx;
char *attr;
@ -414,11 +414,11 @@ print_ldap_entries (my_opt_t myopt, LDAP *ld, LDAPMessage *msg, char *want_attr)
}
for (pth_enter (), attr = my_ldap_first_attribute (ld, item, &berctx),
pth_leave ();
for (npth_unprotect (), attr = my_ldap_first_attribute (ld, item, &berctx),
npth_protect ();
attr;
pth_enter (), attr = my_ldap_next_attribute (ld, item, berctx),
pth_leave ())
npth_unprotect (), attr = my_ldap_next_attribute (ld, item, berctx),
npth_protect ())
{
struct berval **values;
int idx;
@ -455,9 +455,9 @@ print_ldap_entries (my_opt_t myopt, LDAP *ld, LDAPMessage *msg, char *want_attr)
}
}
pth_enter ();
npth_unprotect ();
values = my_ldap_get_values_len (ld, item, attr);
pth_leave ();
npth_protect ();
if (!values)
{
@ -618,19 +618,19 @@ fetch_ldap (my_opt_t myopt, const char *url, const LDAPURLDesc *ludp)
set_timeout (myopt);
pth_enter ();
npth_unprotect ();
ld = my_ldap_init (host, port);
pth_leave ();
npth_protect ();
if (!ld)
{
log_error (_("LDAP init to `%s:%d' failed: %s\n"),
host, port, strerror (errno));
return -1;
}
pth_enter ();
npth_unprotect ();
/* Fixme: Can we use MYOPT->user or is it shared with other theeads?. */
ret = my_ldap_simple_bind_s (ld, myopt->user, myopt->pass);
pth_leave ();
npth_protect ();
if (ret)
{
log_error (_("binding to `%s:%d' failed: %s\n"),
@ -640,13 +640,13 @@ fetch_ldap (my_opt_t myopt, const char *url, const LDAPURLDesc *ludp)
}
set_timeout (myopt);
pth_enter ();
npth_unprotect ();
rc = my_ldap_search_st (ld, dn, ludp->lud_scope, filter,
myopt->multi && !myopt->attr && ludp->lud_attrs?
ludp->lud_attrs:attrs,
0,
&myopt->timeout, &msg);
pth_leave ();
npth_protect ();
if (rc == LDAP_SIZELIMIT_EXCEEDED && myopt->multi)
{
if (es_fwrite ("E\0\0\0\x09truncated", 14, 1, myopt->outstream) != 1)

View File

@ -106,12 +106,93 @@ struct outstream_cookie_s
{
int refcount; /* Reference counter - possible values are 1 and 2. */
/* We don't need a mutex for the conditions, as npth provides a
simpler condition interface that relies on the global lock. This
can be used if we never yield between testing the condition and
waiting on it. */
npth_cond_t wait_data; /* Condition that data is available. */
npth_cond_t wait_space; /* Condition that space is available. */
int eof_seen; /* EOF indicator. */
size_t buffer_len; /* The valid length of the BUFFER. */
char buffer[4000]; /* Data ring buffer. */
size_t buffer_len; /* The amount of data in the BUFFER. */
size_t buffer_pos; /* The next read position of the BUFFER. */
char buffer[4000]; /* Data buffer. */
};
#define BUFFER_EMPTY(c) ((c)->buffer_len == 0)
#define BUFFER_FULL(c) ((c)->buffer_len == DIM((c)->buffer))
#define BUFFER_DATA_AVAILABLE(c) ((c)->buffer_len)
#define BUFFER_SPACE_AVAILABLE(c) (DIM((c)->buffer) - (c)->buffer_len)
#define BUFFER_INC_POS(c,n) (c)->buffer_pos = ((c)->buffer_pos + (n)) % DIM((c)->buffer)
#define BUFFER_CUR_POS(c) (&(c)->buffer[(c)->buffer_pos])
static int
buffer_get_data (struct outstream_cookie_s *cookie, char *dst, int cnt)
{
int amount;
int left;
int chunk;
amount = cnt;
if (BUFFER_DATA_AVAILABLE (cookie) < amount)
amount = BUFFER_DATA_AVAILABLE (cookie);
left = amount;
/* How large is the part up to the end of the buffer array? */
chunk = DIM(cookie->buffer) - cookie->buffer_pos;
if (chunk > left)
chunk = left;
memcpy (dst, BUFFER_CUR_POS (cookie), chunk);
BUFFER_INC_POS (cookie, chunk);
left -= chunk;
dst += chunk;
if (left)
{
memcpy (dst, BUFFER_CUR_POS (cookie), left);
BUFFER_INC_POS (cookie, left);
}
return amount;
}
static int
buffer_put_data (struct outstream_cookie_s *cookie, const char *src, int cnt)
{
int amount;
int remain;
int left;
int chunk;
remain = DIM(cookie->buffer) - cookie->buffer_len;
amount = cnt;
if (remain < amount)
amount = remain;
left = amount;
/* How large is the part up to the end of the buffer array? */
chunk = DIM(cookie->buffer) - cookie->buffer_pos;
if (chunk > left)
chunk = left;
memcpy (BUFFER_CUR_POS (cookie), src, chunk);
BUFFER_INC_POS (cookie, chunk);
left -= chunk;
src += chunk;
if (left)
{
memcpy (BUFFER_CUR_POS (cookie), src, left);
BUFFER_INC_POS (cookie, left);
}
cookie->buffer_len -= amount;
return amount;
}
/* The writer function for the outstream. This is used to transfer
the output of the ldap wrapper thread to the ksba reader object. */
@ -120,43 +201,42 @@ outstream_cookie_writer (void *cookie_arg, const void *buffer, size_t size)
{
struct outstream_cookie_s *cookie = cookie_arg;
const char *src;
char *dst;
ssize_t nwritten = 0;
int res;
ssize_t amount = 0;
src = buffer;
do
{
int was_empty = 0;
/* Wait for free space. */
while (cookie->buffer_len == DIM (cookie->buffer))
while (BUFFER_FULL(cookie))
{
/* Buffer is full: Wait for space. */
pth_yield (NULL);
res = npth_cond_wait (&cookie->wait_space, NULL);
if (res)
{
gpg_err_set_errno (res);
return -1;
}
}
if (BUFFER_EMPTY(cookie))
was_empty = 1;
/* Copy data. */
dst = cookie->buffer + cookie->buffer_len;
while (size && cookie->buffer_len < DIM (cookie->buffer))
{
*dst++ = *src++;
size--;
cookie->buffer_len++;
nwritten++;
}
nwritten = buffer_put_data (cookie, buffer, size);
size -= nwritten;
src += nwritten;
amount += nwritten;
if (was_empty)
npth_cond_signal (&cookie->wait_data);
}
while (size); /* Until done. */
if (nwritten)
{
/* Signal data is available - a pth_yield is sufficient because
the test is explicit. To increase performance we could do a
pth_yield to the other thread and only fall back to yielding
to any thread if that returns an error (i.e. the other thread
is not runnable). However our w32pth does not yet support
yielding to a specific thread, thus this won't help. */
pth_yield (NULL);
}
return nwritten;
return amount;
}
@ -165,7 +245,11 @@ outstream_release_cookie (struct outstream_cookie_s *cookie)
{
cookie->refcount--;
if (!cookie->refcount)
xfree (cookie);
{
npth_cond_destroy (&cookie->wait_data);
npth_cond_destroy (&cookie->wait_space);
xfree (cookie);
}
}
@ -198,6 +282,7 @@ outstream_reader_cb (void *cb_value, char *buffer, size_t count,
char *dst;
const char *src;
size_t nread = 0;
int was_full = 0;
if (!buffer && !count && !r_nread)
return gpg_error (GPG_ERR_NOT_SUPPORTED); /* Rewind is not supported. */
@ -205,31 +290,26 @@ outstream_reader_cb (void *cb_value, char *buffer, size_t count,
*r_nread = 0;
dst = buffer;
while (cookie->buffer_pos == cookie->buffer_len)
while (BUFFER_EMPTY(cookie))
{
if (cookie->eof_seen)
return gpg_error (GPG_ERR_EOF);
/* Wait for data to become available. */
pth_yield (NULL);
npth_cond_wait (&cookie->wait_data, NULL);
}
if (BUFFER_FULL(cookie))
was_full = 1;
src = cookie->buffer + cookie->buffer_pos;
while (count && cookie->buffer_pos < cookie->buffer_len)
nread = buffer_get_data (cookie, buffer, count);
if (was_full)
{
*dst++ = *src++;
count--;
cookie->buffer_pos++;
nread++;
npth_cond_signal (&cookie->wait_space);
}
if (cookie->buffer_pos == cookie->buffer_len)
cookie->buffer_pos = cookie->buffer_len = 0;
/* Now there should be some space available. We do this even if
COUNT was zero so to give the writer end a chance to continue. */
pth_yield (NULL);
*r_nread = nread;
return 0; /* Success. */
}
@ -351,10 +431,12 @@ ldap_wrapper (ctrl_t ctrl, ksba_reader_t *r_reader, const char *argv[])
{
gpg_error_t err;
struct ldap_wrapper_thread_parms *parms;
pth_attr_t tattr;
npth_attr_t tattr;
es_cookie_io_functions_t outstream_func = { NULL };
struct outstream_cookie_s *outstream_cookie;
ksba_reader_t reader;
int res;
npth_t thread;
(void)ctrl;
@ -381,6 +463,22 @@ ldap_wrapper (ctrl_t ctrl, ksba_reader_t *r_reader, const char *argv[])
}
outstream_cookie->refcount++;
res = npth_cond_init (&outstream_cookie->wait_data, NULL);
if (res)
{
free_arg_list (parms->arg_list);
xfree (parms);
return gpg_error_from_errno (res);
}
res = npth_cond_init (&outstream_cookie->wait_space, NULL);
if (res)
{
npth_cond_destroy (&outstream_cookie->wait_data);
free_arg_list (parms->arg_list);
xfree (parms);
return gpg_error_from_errno (res);
}
err = ksba_reader_new (&reader);
if (!err)
err = ksba_reader_set_release_notify (reader,
@ -407,27 +505,37 @@ ldap_wrapper (ctrl_t ctrl, ksba_reader_t *r_reader, const char *argv[])
if (!parms->outstream)
{
err = gpg_error_from_syserror ();
free_arg_list (parms->arg_list);
ksba_reader_release (reader);
outstream_release_cookie (outstream_cookie);
free_arg_list (parms->arg_list);
xfree (parms);
return err;
}
outstream_cookie->refcount++;
tattr = pth_attr_new();
pth_attr_set (tattr, PTH_ATTR_JOINABLE, 0);
pth_attr_set (tattr, PTH_ATTR_STACK_SIZE, 128*1024);
pth_attr_set (tattr, PTH_ATTR_NAME, "ldap-wrapper");
if (pth_spawn (tattr, ldap_wrapper_thread, parms))
parms = NULL; /* Now owned by the thread. */
else
res = npth_attr_init(&tattr);
if (res)
{
err = gpg_error_from_syserror ();
log_error ("error spawning ldap wrapper thread: %s\n",
strerror (errno) );
err = gpg_error_from_errno (res);
ksba_reader_release (reader);
free_arg_list (parms->arg_list);
es_fclose (parms->outstream);
xfree (parms);
return err;
}
pth_attr_destroy (tattr);
npth_attr_setdetachstate (&tattr, NPTH_CREATE_DETACHED);
res = npth_create (&thread, &tattr, ldap_wrapper_thread, parms);
npth_attr_destroy (&tattr);
if (res)
{
err = gpg_error_from_errno (res);
log_error ("error spawning ldap wrapper thread: %s\n",
strerror (res) );
}
else
parms = NULL; /* Now owned by the thread. */
if (parms)
{
free_arg_list (parms->arg_list);