From 1ddd69935da629188dcf9215cd9e7a8f68b34a97 Mon Sep 17 00:00:00 2001 From: NIIBE Yutaka Date: Tue, 25 Jul 2023 12:02:26 +0900 Subject: [PATCH] gpg: Add parallelized filter for hashing. * g10/options.h (COMPAT_PARALLELIZED): New. * g10/filter.h (md_thd_filter_context_t): New type. (md_thd_filter_set_md, md_thd_filter): New. * g10/gpg.c (compatibility_flags): Update to support COMPAT_PARALLELIZED. * g10/mdfilter.c (struct md_thd_filter_context): New. (lock_md, unlock_md, get_buffer_to_hash, put_buffer_to_recv): New. (get_buffer_to_fill, put_buffer_to_send, md_thread): New. (md_thd_filter, md_thd_filter_set_md): New. * g10/sign.c (sign_file): Add support for md_thd_filter. (sign_symencrypt_file): Likewise. -- GnuPG-bug-id: 6570 Signed-off-by: NIIBE Yutaka --- g10/filter.h | 4 + g10/gpg.c | 1 + g10/mdfilter.c | 295 +++++++++++++++++++++++++++++++++++++++++++++++++ g10/options.h | 4 +- g10/sign.c | 64 ++++++++--- 5 files changed, 352 insertions(+), 16 deletions(-) diff --git a/g10/filter.h b/g10/filter.h index 4b4fc55ff..321b553dc 100644 --- a/g10/filter.h +++ b/g10/filter.h @@ -29,6 +29,9 @@ typedef struct { size_t maxbuf_size; } md_filter_context_t; +typedef struct md_thd_filter_context *md_thd_filter_context_t; +void md_thd_filter_set_md (md_thd_filter_context_t mfx, gcry_md_hd_t md); + typedef struct { int refcount; /* Initialized to 1. */ @@ -165,6 +168,7 @@ typedef struct { /*-- mdfilter.c --*/ int md_filter( void *opaque, int control, iobuf_t a, byte *buf, size_t *ret_len); +int md_thd_filter( void *opaque, int control, iobuf_t a, byte *buf, size_t *ret_len); void free_md_filter_context( md_filter_context_t *mfx ); /*-- armor.c --*/ diff --git a/g10/gpg.c b/g10/gpg.c index 23bf8d971..8b3d79e1e 100644 --- a/g10/gpg.c +++ b/g10/gpg.c @@ -1027,6 +1027,7 @@ static struct debug_flags_s debug_flags [] = /* The list of compatibility flags. */ static struct compatibility_flags_s compatibility_flags [] = { + { COMPAT_PARALLELIZED, "parallelized" }, { 0, NULL } }; diff --git a/g10/mdfilter.c b/g10/mdfilter.c index f3318f15c..a655d6d72 100644 --- a/g10/mdfilter.c +++ b/g10/mdfilter.c @@ -22,6 +22,7 @@ #include #include #include +#include #include "gpg.h" #include "../common/status.h" @@ -71,3 +72,297 @@ free_md_filter_context( md_filter_context_t *mfx ) mfx->md2 = NULL; mfx->maxbuf_size = 0; } + + +/**************** + * Threaded implementation for hashing. + */ + +struct md_thd_filter_context { + gcry_md_hd_t md; + npth_t thd; + /**/ + npth_mutex_t mutex; + npth_cond_t cond; + size_t bufsize; + unsigned int produce : 1; + unsigned int consume : 1; + ssize_t written0; + ssize_t written1; + unsigned char buf[1]; +}; + + +static void +lock_md (struct md_thd_filter_context *mfx) +{ + int rc = npth_mutex_lock (&mfx->mutex); + if (rc) + log_fatal ("%s: failed to acquire mutex: %s\n", __func__, + gpg_strerror (gpg_error_from_errno (rc))); +} + + +static void +unlock_md (struct md_thd_filter_context * mfx) +{ + int rc = npth_mutex_unlock (&mfx->mutex); + if (rc) + log_fatal ("%s: failed to release mutex: %s\n", __func__, + gpg_strerror (gpg_error_from_errno (rc))); +} + +static int +get_buffer_to_hash (struct md_thd_filter_context *mfx, + unsigned char **r_buf, size_t *r_len) +{ + int rc = 0; + + lock_md (mfx); + + if ((mfx->consume == 0 && mfx->written0 < 0) + || (mfx->consume != 0 && mfx->written1 < 0)) + { + rc = npth_cond_wait (&mfx->cond, &mfx->mutex); + if (rc) + { + unlock_md (mfx); + return -1; + } + } + + if (mfx->consume == 0) + { + *r_buf = mfx->buf; + *r_len = mfx->written0; + } + else + { + *r_buf = mfx->buf + mfx->bufsize; + *r_len = mfx->written1; + } + + unlock_md (mfx); + + return 0; +} + +static int +put_buffer_to_recv (struct md_thd_filter_context *mfx) +{ + int rc = 0; + + lock_md (mfx); + if (mfx->consume == 0) + { + mfx->written0 = -1; + mfx->consume = 1; + } + else + { + mfx->written1 = -1; + mfx->consume = 0; + } + + rc = npth_cond_signal (&mfx->cond); + if (rc) + { + unlock_md (mfx); + return -1; + } + + unlock_md (mfx); + return 0; +} + +static int +get_buffer_to_fill (struct md_thd_filter_context *mfx, + unsigned char **r_buf, size_t len) +{ + lock_md (mfx); + + if (len > mfx->bufsize) + { + unlock_md (mfx); + return GPG_ERR_BUFFER_TOO_SHORT; + } + + if ((mfx->produce == 0 && mfx->written0 >= 0) + || (mfx->produce != 0 && mfx->written1 >= 0)) + { + int rc = npth_cond_wait (&mfx->cond, &mfx->mutex); + if (rc) + { + unlock_md (mfx); + return gpg_error_from_errno (rc); + } + } + + if (mfx->produce == 0) + *r_buf = mfx->buf; + else + *r_buf = mfx->buf + mfx->bufsize; + unlock_md (mfx); + return 0; +} + +static int +put_buffer_to_send (struct md_thd_filter_context *mfx, size_t len) +{ + int rc; + + lock_md (mfx); + if (mfx->produce == 0) + { + mfx->written0 = len; + mfx->produce = 1; + } + else + { + mfx->written1 = len; + mfx->produce = 0; + } + + rc = npth_cond_signal (&mfx->cond); + if (rc) + { + unlock_md (mfx); + return gpg_error_from_errno (rc); + } + + unlock_md (mfx); + + /* Yield to the md_thread to let it compute the hash in parallel */ + npth_usleep (0); + return 0; +} + + +static void * +md_thread (void *arg) +{ + struct md_thd_filter_context *mfx = arg; + + while (1) + { + unsigned char *buf; + size_t len; + + if (get_buffer_to_hash (mfx, &buf, &len) < 0) + /* Error */ + return NULL; + + if (len == 0) + break; + + npth_unprotect (); + gcry_md_write (mfx->md, buf, len); + npth_protect (); + + if (put_buffer_to_recv (mfx) < 0) + /* Error */ + return NULL; + } + + return NULL; +} + +int +md_thd_filter (void *opaque, int control, + IOBUF a, byte *buf, size_t *ret_len) +{ + size_t size = *ret_len; + struct md_thd_filter_context **r_mfx = opaque; + struct md_thd_filter_context *mfx = *r_mfx; + int rc=0; + + if (control == IOBUFCTRL_INIT) + { + npth_attr_t tattr; + size_t n; + + n = 2 * iobuf_set_buffer_size (0) * 1024; + mfx = xtrymalloc (n + offsetof (struct md_thd_filter_context, buf)); + if (!mfx) + return gpg_error_from_syserror (); + *r_mfx = mfx; + mfx->bufsize = n / 2; + mfx->consume = mfx->produce = 0; + mfx->written0 = -1; + mfx->written1 = -1; + + rc = npth_mutex_init (&mfx->mutex, NULL); + if (rc) + { + return gpg_error_from_errno (rc); + } + rc = npth_cond_init (&mfx->cond, NULL); + if (rc) + { + npth_mutex_destroy (&mfx->mutex); + return gpg_error_from_errno (rc); + } + rc = npth_attr_init (&tattr); + if (rc) + { + npth_cond_destroy (&mfx->cond); + npth_mutex_destroy (&mfx->mutex); + return gpg_error_from_errno (rc); + } + npth_attr_setdetachstate (&tattr, NPTH_CREATE_JOINABLE); + rc = npth_create (&mfx->thd, &tattr, md_thread, mfx); + if (rc) + { + npth_cond_destroy (&mfx->cond); + npth_mutex_destroy (&mfx->mutex); + npth_attr_destroy (&tattr); + return gpg_error_from_errno (rc); + } + npth_attr_destroy (&tattr); + } + else if (control == IOBUFCTRL_UNDERFLOW) + { + int i; + unsigned char *md_buf = NULL; + + i = iobuf_read (a, buf, size); + if (i == -1) + i = 0; + + rc = get_buffer_to_fill (mfx, &md_buf, i); + if (rc) + return rc; + + if (i) + memcpy (md_buf, buf, i); + + rc = put_buffer_to_send (mfx, i); + if (rc) + return rc; + + if (i == 0) + { + npth_join (mfx->thd, NULL); + rc = -1; /* eof */ + } + + *ret_len = i; + } + else if (control == IOBUFCTRL_FREE) + { + npth_cond_destroy (&mfx->cond); + npth_mutex_destroy (&mfx->mutex); + xfree (mfx); + *r_mfx = NULL; + } + else if (control == IOBUFCTRL_DESC) + mem2str (buf, "md_thd_filter", *ret_len); + + return rc; +} + +void +md_thd_filter_set_md (struct md_thd_filter_context *mfx, gcry_md_hd_t md) +{ + mfx->md = md; +} diff --git a/g10/options.h b/g10/options.h index 914c24849..327a6a06f 100644 --- a/g10/options.h +++ b/g10/options.h @@ -373,7 +373,9 @@ EXTERN_UNLESS_MAIN_MODULE int memory_debug_mode; EXTERN_UNLESS_MAIN_MODULE int memory_stat_debug_mode; /* Compatibility flags */ -/* #define COMPAT_FOO 1 */ +#define COMPAT_PARALLELIZED 1 + +/* #define COMPAT_FOO 2 */ /* Compliance test macors. */ diff --git a/g10/sign.c b/g10/sign.c index 5588557c8..f68ea3bc7 100644 --- a/g10/sign.c +++ b/g10/sign.c @@ -1020,7 +1020,9 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr, const char *fname; armor_filter_context_t *afx; compress_filter_context_t zfx; + gcry_md_hd_t md; md_filter_context_t mfx; + md_thd_filter_context_t mfx2 = NULL; text_filter_context_t tfx; progress_filter_context_t *pfx; encrypt_filter_context_t efx; @@ -1126,10 +1128,10 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr, iobuf_push_filter (inp, text_filter, &tfx); } - if (gcry_md_open (&mfx.md, 0, 0)) + if (gcry_md_open (&md, 0, 0)) BUG (); if (DBG_HASHING) - gcry_md_debug (mfx.md, "sign"); + gcry_md_debug (md, "sign"); /* If we're encrypting and signing, it is reasonable to pick the * hash algorithm to use out of the recipient key prefs. This is @@ -1226,10 +1228,21 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr, } for (sk_rover = sk_list; sk_rover; sk_rover = sk_rover->next) - gcry_md_enable (mfx.md, hash_for (sk_rover->pk)); + gcry_md_enable (md, hash_for (sk_rover->pk)); if (!multifile) - iobuf_push_filter (inp, md_filter, &mfx); + { + if (encryptflag && (opt.compat_flags & COMPAT_PARALLELIZED)) + { + iobuf_push_filter (inp, md_thd_filter, &mfx2); + md_thd_filter_set_md (mfx2, md); + } + else + { + iobuf_push_filter (inp, md_filter, &mfx); + mfx.md = md; + } + } if (detached && !encryptflag) afx->what = 2; @@ -1292,7 +1305,7 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr, goto leave; } - write_status_begin_signing (mfx.md); + write_status_begin_signing (md); /* Setup the inner packet. */ if (detached) @@ -1332,7 +1345,16 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr, memset (&tfx, 0, sizeof tfx); iobuf_push_filter (inp, text_filter, &tfx); } - iobuf_push_filter (inp, md_filter, &mfx); + if (encryptflag && (opt.compat_flags & COMPAT_PARALLELIZED)) + { + iobuf_push_filter (inp, md_thd_filter, &mfx2); + md_thd_filter_set_md (mfx2, md); + } + else + { + iobuf_push_filter (inp, md_filter, &mfx); + mfx.md = md; + } while (iobuf_read (inp, NULL, iobuf_size) != -1) ; iobuf_close (inp); @@ -1361,7 +1383,7 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr, goto leave; /* Write the signatures. */ - rc = write_signature_packets (ctrl, sk_list, out, mfx.md, extrahash, + rc = write_signature_packets (ctrl, sk_list, out, md, extrahash, opt.textmode && !outfile? 0x01 : 0x00, 0, duration, detached ? 'D':'S', NULL); if (rc) @@ -1378,7 +1400,7 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr, write_status (STATUS_END_ENCRYPTION); } iobuf_close (inp); - gcry_md_close (mfx.md); + gcry_md_close (md); release_sk_list (sk_list); release_pk_list (pk_list); recipient_digest_algo = 0; @@ -1561,6 +1583,8 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr) progress_filter_context_t *pfx; compress_filter_context_t zfx; md_filter_context_t mfx; + md_thd_filter_context_t mfx2 = NULL; + gcry_md_hd_t md; text_filter_context_t tfx; cipher_filter_context_t cfx; iobuf_t inp = NULL; @@ -1644,15 +1668,25 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr) /* Prepare to calculate the MD over the input. */ if (opt.textmode) iobuf_push_filter (inp, text_filter, &tfx); - if (gcry_md_open (&mfx.md, 0, 0)) + if (gcry_md_open (&md, 0, 0)) BUG (); if (DBG_HASHING) - gcry_md_debug (mfx.md, "symc-sign"); + gcry_md_debug (md, "symc-sign"); for (sk_rover = sk_list; sk_rover; sk_rover = sk_rover->next) - gcry_md_enable (mfx.md, hash_for (sk_rover->pk)); + gcry_md_enable (md, hash_for (sk_rover->pk)); + + if ((opt.compat_flags & COMPAT_PARALLELIZED)) + { + iobuf_push_filter (inp, md_thd_filter, &mfx2); + md_thd_filter_set_md (mfx2, md); + } + else + { + iobuf_push_filter (inp, md_filter, &mfx); + mfx.md = md; + } - iobuf_push_filter (inp, md_filter, &mfx); /* Push armor output filter */ if (opt.armor) @@ -1694,7 +1728,7 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr) if (rc) goto leave; - write_status_begin_signing (mfx.md); + write_status_begin_signing (md); /* Pipe data through all filters; i.e. write the signed stuff. */ /* (current filters: zip - encrypt - armor) */ @@ -1706,7 +1740,7 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr) /* Write the signatures. */ /* (current filters: zip - encrypt - armor) */ - rc = write_signature_packets (ctrl, sk_list, out, mfx.md, extrahash, + rc = write_signature_packets (ctrl, sk_list, out, md, extrahash, opt.textmode? 0x01 : 0x00, 0, duration, 'S', NULL); if (rc) @@ -1723,7 +1757,7 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr) } iobuf_close (inp); release_sk_list (sk_list); - gcry_md_close (mfx.md); + gcry_md_close (md); xfree (cfx.dek); xfree (s2k); release_progress_context (pfx);