change thread_list to thread_pool

This commit is contained in:
Ryo Nakamura
2024-02-10 22:34:03 +09:00
parent bfc955a9a7
commit d6f437bcb1
4 changed files with 39 additions and 46 deletions

View File

@@ -92,7 +92,6 @@ struct mscp_ssh_opts {
struct mscp_stats { struct mscp_stats {
size_t total; /** total bytes to be transferred */ size_t total; /** total bytes to be transferred */
size_t done; /** total bytes transferred */ size_t done; /** total bytes transferred */
bool finished; /** true when all copy threads finished */
}; };

View File

@@ -43,13 +43,10 @@ struct mscp {
size_t total_bytes; /* total bytes to be transferred */ size_t total_bytes; /* total bytes to be transferred */
struct list_head thread_list; pool *thread_pool;
rwlock thread_rwlock;
}; };
struct mscp_thread { struct mscp_thread {
struct list_head list; /* mscp->thread_list */
struct mscp *m; struct mscp *m;
int id; int id;
sftp_session sftp; sftp_session sftp;
@@ -246,8 +243,11 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
chunk_pool_init(&m->cp); chunk_pool_init(&m->cp);
INIT_LIST_HEAD(&m->thread_list); m->thread_pool = pool_new();
rwlock_init(&m->thread_rwlock); if (!m->thread_pool) {
priv_set_errv("pool_new: %s", strerrno());
goto free_out;
}
if ((m->sem = sem_create(o->max_startups)) == NULL) { if ((m->sem = sem_create(o->max_startups)) == NULL) {
priv_set_errv("sem_create: %s", strerrno()); priv_set_errv("sem_create: %s", strerrno());
@@ -284,6 +284,8 @@ free_out:
pool_free(m->src_pool); pool_free(m->src_pool);
if (m->path_pool) if (m->path_pool)
pool_free(m->path_pool); pool_free(m->path_pool);
if (m->thread_pool)
pool_free(m->thread_pool);
free(m); free(m);
return NULL; return NULL;
} }
@@ -343,13 +345,13 @@ static int get_page_mask(void)
static void mscp_stop_copy_thread(struct mscp *m) static void mscp_stop_copy_thread(struct mscp *m)
{ {
struct mscp_thread *t; struct mscp_thread *t;
unsigned int idx;
RWLOCK_READ_ACQUIRE(&m->thread_rwlock); pool_lock(m->thread_pool);
list_for_each_entry(t, &m->thread_list, list) { pool_for_each(m->thread_pool, t, idx) {
if (!t->finished) if (!t->finished)
pthread_cancel(t->tid); pthread_cancel(t->tid);
} }
RWLOCK_RELEASE(); pool_unlock(m->thread_pool);
} }
static void mscp_stop_scan_thread(struct mscp *m) static void mscp_stop_scan_thread(struct mscp *m)
@@ -369,7 +371,6 @@ void *mscp_scan_thread(void *arg)
struct mscp *m = arg; struct mscp *m = arg;
sftp_session src_sftp = NULL, dst_sftp = NULL; sftp_session src_sftp = NULL, dst_sftp = NULL;
struct path_resolve_args a; struct path_resolve_args a;
struct list_head tmp;
struct path *p; struct path *p;
struct stat ss, ds; struct stat ss, ds;
char *src_path; char *src_path;
@@ -529,10 +530,10 @@ int mscp_start(struct mscp *m)
t = mscp_copy_thread_spawn(m, n); t = mscp_copy_thread_spawn(m, n);
if (!t) if (!t)
break; break;
if (pool_push_lock(m->thread_pool, t) < 0) {
RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock); priv_set_errv("pool_push_lock: %s", strerrno());
list_add_tail(&t->list, &m->thread_list); break;
RWLOCK_RELEASE(); }
} }
return n; return n;
@@ -542,6 +543,7 @@ int mscp_join(struct mscp *m)
{ {
struct mscp_thread *t; struct mscp_thread *t;
struct path *p; struct path *p;
unsigned int idx;
size_t done = 0, nr_copied = 0, nr_tobe_copied = 0; size_t done = 0, nr_copied = 0, nr_tobe_copied = 0;
int n, ret = 0; int n, ret = 0;
@@ -549,9 +551,11 @@ int mscp_join(struct mscp *m)
ret = mscp_scan_join(m); ret = mscp_scan_join(m);
/* waiting for copy threads join... */ /* waiting for copy threads join... */
RWLOCK_READ_ACQUIRE(&m->thread_rwlock); pool_for_each(m->thread_pool, t, idx) {
list_for_each_entry(t, &m->thread_list, list) {
pthread_join(t->tid, NULL); pthread_join(t->tid, NULL);
}
pool_for_each(m->thread_pool, t, idx) {
done += t->done; done += t->done;
if (t->ret != 0) if (t->ret != 0)
ret = t->ret; ret = t->ret;
@@ -560,22 +564,19 @@ int mscp_join(struct mscp *m)
t->sftp = NULL; t->sftp = NULL;
} }
} }
RWLOCK_RELEASE();
if (m->first) {
ssh_sftp_close(m->first);
m->first = NULL;
}
/* count up number of transferred files */ /* count up number of transferred files */
pool_lock(m->path_pool);
pool_iter_for_each(m->path_pool, p) { pool_iter_for_each(m->path_pool, p) {
nr_tobe_copied++; nr_tobe_copied++;
if (p->state == FILE_STATE_DONE) { if (p->state == FILE_STATE_DONE) {
nr_copied++; nr_copied++;
} }
} }
pool_unlock();
if (m->first) {
ssh_sftp_close(m->first);
m->first = NULL;
}
pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes, pr_notice("%lu/%lu bytes copied for %lu/%lu files", done, m->total_bytes,
nr_copied, nr_tobe_copied); nr_copied, nr_tobe_copied);
@@ -706,13 +707,6 @@ out:
/* cleanup-related functions */ /* cleanup-related functions */
static void list_free_thread(struct list_head *list)
{
struct mscp_thread *t;
t = list_entry(list, typeof(*t), list);
free(t);
}
void mscp_cleanup(struct mscp *m) void mscp_cleanup(struct mscp *m)
{ {
if (m->first) { if (m->first) {
@@ -722,12 +716,11 @@ void mscp_cleanup(struct mscp *m)
pool_zeroize(m->src_pool, free); pool_zeroize(m->src_pool, free);
pool_zeroize(m->path_pool, (pool_map_f)free_path); pool_zeroize(m->path_pool, (pool_map_f)free_path);
chunk_pool_release(&m->cp); chunk_pool_release(&m->cp);
chunk_pool_init(&m->cp); chunk_pool_init(&m->cp);
RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock); pool_zeroize(m->thread_pool, free);
list_free_f(&m->thread_list, list_free_thread);
RWLOCK_RELEASE();
} }
void mscp_free(struct mscp *m) void mscp_free(struct mscp *m)
@@ -746,20 +739,13 @@ void mscp_free(struct mscp *m)
void mscp_get_stats(struct mscp *m, struct mscp_stats *s) void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
{ {
int nr_finished = 0, nr_threads = 0;
struct mscp_thread *t; struct mscp_thread *t;
unsigned int idx;
s->total = m->total_bytes; s->total = m->total_bytes;
s->done = 0; s->done = 0;
RWLOCK_READ_ACQUIRE(&m->thread_rwlock); pool_for_each(m->thread_pool, t, idx) {
list_for_each_entry(t, &m->thread_list, list) {
nr_threads++;
s->done += t->done; s->done += t->done;
if (t->finished)
nr_finished++;
} }
RWLOCK_RELEASE();
s->finished = nr_threads > 0 ? (nr_finished == nr_threads) : false;
} }

View File

@@ -61,6 +61,7 @@ int pool_push(pool *p, void *v)
p->array = new; p->array = new;
} }
p->array[p->num] = v; p->array[p->num] = v;
__sync_synchronize();
p->num++; p->num++;
return 0; return 0;
} }
@@ -88,6 +89,11 @@ void *pool_pop_lock(pool *p)
return v; return v;
} }
void *pool_get(pool *p, unsigned int idx)
{
return p->num <= idx ? NULL : p->array[idx];
}
void *pool_iter_next(pool *p) void *pool_iter_next(pool *p)
{ {
if (p->num <= p->idx) if (p->num <= p->idx)

View File

@@ -53,8 +53,10 @@ int pool_push_lock(pool *p, void *v);
void *pool_pop(pool *p); void *pool_pop(pool *p);
void *pool_pop_lock(pool *p); void *pool_pop_lock(pool *p);
/* pool_get() returns value indexed by idx */
void *pool_get(pool *p, unsigned int idx);
#define pool_size(p) ((p)->num) #define pool_size(p) ((p)->num)
#define pool_get(p, idx) ((p->num <= idx) ? NULL : p->array[idx])
/* /*
* pool->idx indicates next *v in an iteration. This has two * pool->idx indicates next *v in an iteration. This has two