mirror of
https://github.com/upa/mscp.git
synced 2026-02-15 01:34:44 +08:00
change chunk_pool from list to pool
This commit is contained in:
73
src/mscp.c
73
src/mscp.c
@@ -6,7 +6,6 @@
|
||||
#include <semaphore.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#include <list.h>
|
||||
#include <pool.h>
|
||||
#include <minmax.h>
|
||||
#include <ssh.h>
|
||||
@@ -34,6 +33,7 @@ struct mscp_thread {
|
||||
|
||||
/* attributes used by scan thread */
|
||||
size_t total_bytes;
|
||||
bool finished;
|
||||
};
|
||||
|
||||
struct mscp {
|
||||
@@ -50,14 +50,15 @@ struct mscp {
|
||||
sftp_session first; /* first sftp session */
|
||||
|
||||
char dst_path[PATH_MAX];
|
||||
pool *src_pool, *path_pool;
|
||||
pool *thread_pool; /* mscp_threads for copy thread */
|
||||
|
||||
struct chunk_pool cp;
|
||||
pool *src_pool, *path_pool, *chunk_pool, *thread_pool;
|
||||
|
||||
struct mscp_thread scan; /* mscp_thread for mscp_scan_thread() */
|
||||
#define mscp_scan_is_finished(m) ((m)->scan.finished)
|
||||
};
|
||||
|
||||
|
||||
|
||||
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
|
||||
#define DEFAULT_NR_AHEAD 32
|
||||
#define DEFAULT_BUF_SZ 16384
|
||||
@@ -223,29 +224,28 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
|
||||
return NULL;
|
||||
}
|
||||
|
||||
m = malloc(sizeof(*m));
|
||||
if (!m) {
|
||||
if (!(m = malloc(sizeof(*m)))) {
|
||||
priv_set_errv("malloc: %s", strerrno());
|
||||
return NULL;
|
||||
}
|
||||
memset(m, 0, sizeof(*m));
|
||||
|
||||
m->src_pool = pool_new();
|
||||
if (!m->src_pool) {
|
||||
if (!(m->src_pool = pool_new())) {
|
||||
priv_set_errv("pool_new: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
|
||||
m->path_pool = pool_new();
|
||||
if (!m->path_pool) {
|
||||
if (!(m->path_pool = pool_new())) {
|
||||
priv_set_errv("pool_new: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
|
||||
chunk_pool_init(&m->cp);
|
||||
if (!(m->chunk_pool = pool_new())) {
|
||||
priv_set_errv("pool_new: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
|
||||
m->thread_pool = pool_new();
|
||||
if (!m->thread_pool) {
|
||||
if (!(m->thread_pool = pool_new())) {
|
||||
priv_set_errv("pool_new: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
@@ -255,8 +255,7 @@ struct mscp *mscp_init(const char *remote_host, int direction, struct mscp_opts
|
||||
goto free_out;
|
||||
}
|
||||
|
||||
m->remote = strdup(remote_host);
|
||||
if (!m->remote) {
|
||||
if (!(m->remote = strdup(remote_host))) {
|
||||
priv_set_errv("strdup: %s", strerrno());
|
||||
goto free_out;
|
||||
}
|
||||
@@ -285,8 +284,12 @@ free_out:
|
||||
pool_free(m->src_pool);
|
||||
if (m->path_pool)
|
||||
pool_free(m->path_pool);
|
||||
if (m->chunk_pool)
|
||||
pool_free(m->chunk_pool);
|
||||
if (m->thread_pool)
|
||||
pool_free(m->thread_pool);
|
||||
if (m->remote)
|
||||
free(m->remote);
|
||||
free(m);
|
||||
return NULL;
|
||||
}
|
||||
@@ -405,8 +408,8 @@ void *mscp_scan_thread(void *arg)
|
||||
a.dst_path_is_dir = true;
|
||||
}
|
||||
|
||||
a.cp = &m->cp;
|
||||
a.path_pool = m->path_pool;
|
||||
a.chunk_pool = m->chunk_pool;
|
||||
a.nr_conn = m->opts->nr_threads;
|
||||
a.min_chunk_sz = m->opts->min_chunk_sz;
|
||||
a.max_chunk_sz = m->opts->max_chunk_sz;
|
||||
@@ -443,13 +446,13 @@ void *mscp_scan_thread(void *arg)
|
||||
}
|
||||
|
||||
pr_info("walk source path(s) done");
|
||||
chunk_pool_set_filled(&m->cp);
|
||||
t->ret = 0;
|
||||
t->finished = true;
|
||||
return NULL;
|
||||
|
||||
err_out:
|
||||
chunk_pool_set_filled(&m->cp);
|
||||
t->ret = -1;
|
||||
t->finished = true;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@@ -461,6 +464,7 @@ int mscp_scan(struct mscp *m)
|
||||
memset(t, 0, sizeof(*t));
|
||||
t->m = m;
|
||||
t->sftp = m->first;
|
||||
t->finished = false;
|
||||
|
||||
ret = pthread_create(&t->tid, NULL, mscp_scan_thread, t);
|
||||
if (ret < 0) {
|
||||
@@ -473,8 +477,8 @@ int mscp_scan(struct mscp *m)
|
||||
* finished. If the number of chunks are smaller than
|
||||
* nr_threads, we adjust nr_threads to the number of chunks.
|
||||
*/
|
||||
while (!chunk_pool_is_filled(&m->cp) &&
|
||||
chunk_pool_size(&m->cp) < m->opts->nr_threads)
|
||||
while (!mscp_scan_is_finished(m) &&
|
||||
pool_size(m->chunk_pool) < m->opts->nr_threads)
|
||||
usleep(100);
|
||||
|
||||
return 0;
|
||||
@@ -527,7 +531,7 @@ int mscp_start(struct mscp *m)
|
||||
struct mscp_thread *t;
|
||||
int n, ret = 0;
|
||||
|
||||
if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
|
||||
if ((n = pool_size(m->chunk_pool)) < m->opts->nr_threads) {
|
||||
pr_notice("we have %d chunk(s), set number of connections to %d", n, n);
|
||||
m->opts->nr_threads = n;
|
||||
}
|
||||
@@ -613,7 +617,7 @@ void *mscp_copy_thread(void *arg)
|
||||
struct mscp_thread *t = arg;
|
||||
struct mscp *m = t->m;
|
||||
struct chunk *c;
|
||||
bool nomore;
|
||||
bool next_chunk_exist;
|
||||
|
||||
/* when error occurs, each thread prints error messages
|
||||
* immediately with pr_* functions. */
|
||||
@@ -631,7 +635,7 @@ void *mscp_copy_thread(void *arg)
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
if (!(nomore = chunk_pool_is_empty(&m->cp))) {
|
||||
if ((next_chunk_exist = pool_iter_check_next_lock(m->chunk_pool))) {
|
||||
if (m->opts->interval > 0)
|
||||
wait_for_interval(m->opts->interval);
|
||||
pr_notice("thread[%d]: connecting to %s", t->id, m->remote);
|
||||
@@ -643,7 +647,7 @@ void *mscp_copy_thread(void *arg)
|
||||
goto err_out;
|
||||
}
|
||||
|
||||
if (nomore) {
|
||||
if (!next_chunk_exist) {
|
||||
pr_notice("thread[%d]: no more connections needed", t->id);
|
||||
goto out;
|
||||
}
|
||||
@@ -668,15 +672,17 @@ void *mscp_copy_thread(void *arg)
|
||||
}
|
||||
|
||||
while (1) {
|
||||
c = chunk_pool_pop(&m->cp);
|
||||
if (c == CHUNK_POP_WAIT) {
|
||||
usleep(100); /* XXX: hard code */
|
||||
continue;
|
||||
c = pool_iter_next_lock(m->chunk_pool);
|
||||
if (c == NULL) {
|
||||
if (!mscp_scan_is_finished(m)) {
|
||||
/* scan is not finished, wait. */
|
||||
usleep(100);
|
||||
continue;
|
||||
}
|
||||
/* scan is finished, and no more chunks */
|
||||
break;
|
||||
}
|
||||
|
||||
if (!c)
|
||||
break; /* no more chunks */
|
||||
|
||||
if ((t->ret = copy_chunk(c, src_sftp, dst_sftp, m->opts->nr_ahead,
|
||||
m->opts->buf_sz, m->opts->preserve_ts,
|
||||
&t->copied_bytes)) < 0)
|
||||
@@ -710,10 +716,7 @@ void mscp_cleanup(struct mscp *m)
|
||||
|
||||
pool_zeroize(m->src_pool, free);
|
||||
pool_zeroize(m->path_pool, (pool_map_f)free_path);
|
||||
|
||||
chunk_pool_release(&m->cp);
|
||||
chunk_pool_init(&m->cp);
|
||||
|
||||
pool_zeroize(m->chunk_pool, free);
|
||||
pool_zeroize(m->thread_pool, free);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user