mirror of
https://github.com/upa/mscp.git
synced 2026-02-04 03:24:58 +08:00
use sigalrm for printing progress bar
This commit is contained in:
215
src/main.c
215
src/main.c
@@ -28,6 +28,16 @@
|
||||
/* XXX: need to investigate max buf size for sftp_read/sftp_write */
|
||||
#define DEFAULT_NR_AHEAD 16
|
||||
|
||||
struct mscp_thread {
|
||||
sftp_session sftp;
|
||||
|
||||
pthread_t tid;
|
||||
int cpu;
|
||||
size_t done; /* copied bytes */
|
||||
bool finished;
|
||||
int ret;
|
||||
};
|
||||
|
||||
struct mscp {
|
||||
char *host; /* remote host (and username) */
|
||||
struct ssh_opts *opts; /* ssh parameters */
|
||||
@@ -39,38 +49,27 @@ struct mscp {
|
||||
|
||||
char *target;
|
||||
|
||||
int nr_threads; /* number of threads */
|
||||
int sftp_buf_sz, io_buf_sz;
|
||||
int nr_ahead; /* # of ahead read command for remote to local copy */
|
||||
|
||||
struct timeval start; /* timestamp of starting copy */
|
||||
};
|
||||
|
||||
struct mscp_thread {
|
||||
struct mscp *mscp;
|
||||
sftp_session sftp;
|
||||
|
||||
pthread_t tid;
|
||||
int cpu;
|
||||
size_t done; /* copied bytes */
|
||||
bool finished;
|
||||
int ret;
|
||||
};
|
||||
struct mscp_thread *threads;
|
||||
} m;
|
||||
|
||||
void *mscp_copy_thread(void *arg);
|
||||
void *mscp_monitor_thread(void *arg);
|
||||
int mscp_stat_init();
|
||||
void mscp_stat_final();
|
||||
|
||||
|
||||
pthread_t mtid;
|
||||
struct mscp_thread *threads;
|
||||
int nr_threads;
|
||||
|
||||
void stop_copy_threads(int sig)
|
||||
{
|
||||
int n;
|
||||
|
||||
pr("stopping...\n");
|
||||
for (n = 0; n < nr_threads; n++) {
|
||||
if (!threads[n].finished)
|
||||
pthread_cancel(threads[n].tid);
|
||||
for (n = 0; n < m.nr_threads; n++) {
|
||||
if (!m.threads[n].finished)
|
||||
pthread_cancel(m.threads[n].tid);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -221,7 +220,6 @@ int expand_coremask(const char *coremask, int **cores, int *nr_cores)
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
struct mscp m;
|
||||
struct ssh_opts opts;
|
||||
int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ;
|
||||
int max_chunk_sz = 0;
|
||||
@@ -241,14 +239,14 @@ int main(int argc, char **argv)
|
||||
m.io_buf_sz = DEFAULT_IO_BUF_SZ;
|
||||
m.nr_ahead = DEFAULT_NR_AHEAD;
|
||||
|
||||
nr_threads = (int)(nr_cpus() / 2);
|
||||
nr_threads = nr_threads == 0 ? 1 : nr_threads;
|
||||
m.nr_threads = (int)(nr_cpus() / 2);
|
||||
m.nr_threads = m.nr_threads == 0 ? 1 : m.nr_threads;
|
||||
|
||||
while ((ch = getopt(argc, argv, "n:m:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) {
|
||||
switch (ch) {
|
||||
case 'n':
|
||||
nr_threads = atoi(optarg);
|
||||
if (nr_threads < 1) {
|
||||
m.nr_threads = atoi(optarg);
|
||||
if (m.nr_threads < 1) {
|
||||
pr_err("invalid number of connections: %s\n", optarg);
|
||||
return 1;
|
||||
}
|
||||
@@ -396,7 +394,7 @@ int main(int argc, char **argv)
|
||||
|
||||
/* fill chunk list */
|
||||
ret = chunk_fill(&m.file_list, &m.chunk_list,
|
||||
nr_threads, min_chunk_sz, max_chunk_sz);
|
||||
m.nr_threads, min_chunk_sz, max_chunk_sz);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
|
||||
@@ -408,16 +406,15 @@ int main(int argc, char **argv)
|
||||
return 0;
|
||||
|
||||
/* prepare thread instances */
|
||||
if ((n = list_count(&m.chunk_list)) < nr_threads) {
|
||||
pprint3("we have only %d chunk(s). set nr_conns to %d\n", n, n);
|
||||
nr_threads = n;
|
||||
if ((n = list_count(&m.chunk_list)) < m.nr_threads) {
|
||||
pprint3("we have only %d chunk(s). set NR_CONNECTIONS to %d\n", n, n);
|
||||
m.nr_threads = n;
|
||||
}
|
||||
|
||||
threads = calloc(nr_threads, sizeof(struct mscp_thread));
|
||||
memset(threads, 0, nr_threads * sizeof(struct mscp_thread));
|
||||
for (n = 0; n < nr_threads; n++) {
|
||||
struct mscp_thread *t = &threads[n];
|
||||
t->mscp = &m;
|
||||
m.threads = calloc(m.nr_threads, sizeof(struct mscp_thread));
|
||||
memset(m.threads, 0, m.nr_threads * sizeof(struct mscp_thread));
|
||||
for (n = 0; n < m.nr_threads; n++) {
|
||||
struct mscp_thread *t = &m.threads[n];
|
||||
t->finished = false;
|
||||
if (!coremask)
|
||||
t->cpu = -1;
|
||||
@@ -432,10 +429,8 @@ int main(int argc, char **argv)
|
||||
}
|
||||
}
|
||||
|
||||
/* spawn count thread */
|
||||
ret = pthread_create(&mtid, NULL, mscp_monitor_thread, &m);
|
||||
if (ret < 0) {
|
||||
pr_err("pthread_create error: %d\n", ret);
|
||||
/* init mscp stat for printing progress bar */
|
||||
if (mscp_stat_init() < 0) {
|
||||
stop_copy_threads(0);
|
||||
ret = 1;
|
||||
goto join_out;
|
||||
@@ -448,12 +443,9 @@ int main(int argc, char **argv)
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* save start time */
|
||||
gettimeofday(&m.start, NULL);
|
||||
|
||||
/* spawn threads */
|
||||
for (n = 0; n < nr_threads; n++) {
|
||||
struct mscp_thread *t = &threads[n];
|
||||
/* spawn copy threads */
|
||||
for (n = 0; n < m.nr_threads; n++) {
|
||||
struct mscp_thread *t = &m.threads[n];
|
||||
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
|
||||
if (ret < 0) {
|
||||
pr_err("pthread_create error: %d\n", ret);
|
||||
@@ -465,18 +457,17 @@ int main(int argc, char **argv)
|
||||
|
||||
join_out:
|
||||
/* waiting for threads join... */
|
||||
for (n = 0; n < nr_threads; n++)
|
||||
if (threads[n].tid) {
|
||||
pthread_join(threads[n].tid, NULL);
|
||||
if (threads[n].ret < 0)
|
||||
ret = threads[n].ret;
|
||||
for (n = 0; n < m.nr_threads; n++) {
|
||||
if (m.threads[n].tid) {
|
||||
pthread_join(m.threads[n].tid, NULL);
|
||||
if (m.threads[n].ret < 0)
|
||||
ret = m.threads[n].ret;
|
||||
}
|
||||
|
||||
if (mtid != 0) {
|
||||
pthread_cancel(mtid);
|
||||
pthread_join(mtid, NULL);
|
||||
}
|
||||
|
||||
/* print final result */
|
||||
mscp_stat_final();
|
||||
|
||||
out:
|
||||
if (m.ctrl)
|
||||
ssh_sftp_close(m.ctrl);
|
||||
@@ -496,7 +487,6 @@ void mscp_copy_thread_cleanup(void *arg)
|
||||
void *mscp_copy_thread(void *arg)
|
||||
{
|
||||
struct mscp_thread *t = arg;
|
||||
struct mscp *m = t->mscp;
|
||||
sftp_session sftp = t->sftp;
|
||||
struct chunk *c;
|
||||
|
||||
@@ -510,9 +500,9 @@ void *mscp_copy_thread(void *arg)
|
||||
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
||||
|
||||
while (1) {
|
||||
lock_acquire(&m->chunk_lock);
|
||||
c = chunk_acquire(&m->chunk_list);
|
||||
lock_release(&m->chunk_lock);
|
||||
lock_acquire(&m.chunk_lock);
|
||||
c = chunk_acquire(&m.chunk_list);
|
||||
lock_release(&m.chunk_lock);
|
||||
|
||||
if (!c)
|
||||
break; /* no more chunks */
|
||||
@@ -520,8 +510,8 @@ void *mscp_copy_thread(void *arg)
|
||||
if ((t->ret = chunk_prepare(c, sftp)) < 0)
|
||||
break;
|
||||
|
||||
if ((t->ret = chunk_copy(c, sftp, m->sftp_buf_sz, m->io_buf_sz,
|
||||
m->nr_ahead, &t->done)) < 0)
|
||||
if ((t->ret = chunk_copy(c, sftp, m.sftp_buf_sz, m.io_buf_sz,
|
||||
m.nr_ahead, &t->done)) < 0)
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -534,6 +524,9 @@ void *mscp_copy_thread(void *arg)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/* progress bar-related functions */
|
||||
|
||||
static double calculate_timedelta(struct timeval *b, struct timeval *a)
|
||||
{
|
||||
double sec, usec;
|
||||
@@ -555,16 +548,17 @@ static double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
|
||||
return (double)diff / calculate_timedelta(b, a);
|
||||
}
|
||||
|
||||
static char *calculate_eta(size_t tot, size_t done, struct timeval *s, struct timeval *n)
|
||||
static char *calculate_eta(size_t remain, size_t diff,
|
||||
struct timeval *b, struct timeval *a)
|
||||
{
|
||||
static char buf[16];
|
||||
double elapsed = calculate_timedelta(s, n);
|
||||
double elapsed = calculate_timedelta(b, a);
|
||||
double eta;
|
||||
|
||||
if (done == 0)
|
||||
if (diff == 0)
|
||||
snprintf(buf, sizeof(buf), "--:-- ETA");
|
||||
else {
|
||||
eta = (tot - done) / ((done / elapsed));
|
||||
eta = remain / (diff / elapsed);
|
||||
snprintf(buf, sizeof(buf), "%02d:%02d ETA",
|
||||
(int)floor(eta / 60), (int)round(eta) % 60);
|
||||
}
|
||||
@@ -607,7 +601,7 @@ static void print_progress_bar(double percent, char *suffix)
|
||||
pprint1("%s%s", buf, suffix);
|
||||
}
|
||||
|
||||
static void print_progress(struct timeval *start, struct timeval *b, struct timeval *a,
|
||||
static void print_progress(struct timeval *b, struct timeval *a,
|
||||
size_t total, size_t last, size_t done)
|
||||
{
|
||||
char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" };
|
||||
@@ -643,81 +637,58 @@ static void print_progress(struct timeval *start, struct timeval *b, struct time
|
||||
|
||||
snprintf(suffix, sizeof(suffix), "%lu%s/%lu%s %6.1f%s %s",
|
||||
done_round, byte_units[byte_du], total_round, byte_units[byte_tu],
|
||||
bps, bps_units[bps_u], calculate_eta(total, done, start, a));
|
||||
bps, bps_units[bps_u], calculate_eta(total - done, done - last, b, a));
|
||||
|
||||
print_progress_bar(percent, suffix);
|
||||
}
|
||||
|
||||
void mscp_monitor_thread_cleanup(void *arg)
|
||||
|
||||
struct mscp_stat {
|
||||
struct timeval start, before, after;
|
||||
size_t total;
|
||||
size_t last;
|
||||
size_t done;
|
||||
} s;
|
||||
|
||||
void mscp_stat_handler(int signum)
|
||||
{
|
||||
struct mscp *m = arg;
|
||||
struct timeval end;
|
||||
struct file *f;
|
||||
size_t total, done;
|
||||
int n;
|
||||
|
||||
total = done = 0;
|
||||
for (s.done = 0, n = 0; n < m.nr_threads; n++)
|
||||
s.done += m.threads[n].done;
|
||||
|
||||
gettimeofday(&end, NULL);
|
||||
gettimeofday(&s.after, NULL);
|
||||
alarm(1);
|
||||
|
||||
/* get total byte to be transferred */
|
||||
list_for_each_entry(f, &m->file_list, list) {
|
||||
total += f->size;
|
||||
}
|
||||
|
||||
/* get total byte transferred */
|
||||
for (n = 0; n < nr_threads; n++) {
|
||||
done += threads[n].done;
|
||||
}
|
||||
|
||||
print_progress(&m->start, &m->start, &end, total, 0, done);
|
||||
pprint(1, "\n"); /* the final ouput. we need \n */
|
||||
print_progress(&s.before, &s.after, s.total, s.last, s.done);
|
||||
s.before = s.after;
|
||||
s.last = s.done;
|
||||
}
|
||||
|
||||
void *mscp_monitor_thread(void *arg)
|
||||
int mscp_stat_init()
|
||||
{
|
||||
struct mscp *m = arg;
|
||||
struct timeval a, b;
|
||||
struct file *f;
|
||||
bool all_done;
|
||||
size_t total, done, last;
|
||||
int n;
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
||||
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
||||
pthread_cleanup_push(mscp_monitor_thread_cleanup, m);
|
||||
|
||||
/* get total byte to be transferred */
|
||||
total = 0;
|
||||
list_for_each_entry(f, &m->file_list, list) {
|
||||
total += f->size;
|
||||
memset(&s, 0, sizeof(s));
|
||||
list_for_each_entry(f, &m.file_list, list) {
|
||||
s.total += f->size;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
all_done = true;
|
||||
last = done = 0;
|
||||
|
||||
for (n = 0; n < nr_threads; n++) {
|
||||
last += threads[n].done;
|
||||
}
|
||||
gettimeofday(&b, NULL);
|
||||
|
||||
usleep(1000000);
|
||||
|
||||
for (n = 0; n < nr_threads; n++) {
|
||||
done += threads[n].done;
|
||||
if (!threads[n].finished)
|
||||
all_done = false;
|
||||
}
|
||||
gettimeofday(&a, NULL);
|
||||
|
||||
print_progress(&m->start, &b, &a, total, last, done);
|
||||
|
||||
if (all_done || total == done)
|
||||
break;
|
||||
if (signal(SIGALRM, mscp_stat_handler) == SIG_ERR) {
|
||||
pr_err("signal: %s\n", strerrno());
|
||||
return -1;
|
||||
}
|
||||
|
||||
pthread_cleanup_pop(1);
|
||||
gettimeofday(&s.start, NULL);
|
||||
s.before = s.start;
|
||||
alarm(1);
|
||||
|
||||
return NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mscp_stat_final()
|
||||
{
|
||||
alarm(0);
|
||||
mscp_stat_handler(0);
|
||||
alarm(0);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user