mirror of
https://github.com/upa/mscp.git
synced 2026-02-16 10:54:47 +08:00
remove unused code and introduce -b buf_sz option
This commit removes ifdef ASYNC_WRITE. So, mscp always depends on the patched libssh.
This commit is contained in:
100
src/file.c
100
src/file.c
@@ -492,7 +492,7 @@ int chunk_fill(struct list_head *file_list, struct list_head *chunk_list,
|
||||
chunk_sz = max_chunk_sz;
|
||||
else {
|
||||
chunk_sz = (f->size - (f->size % nr_conn)) / nr_conn;
|
||||
chunk_sz &= ~page_mask; /* align in page_sz */
|
||||
chunk_sz &= ~page_mask; /* align with page_sz */
|
||||
if (chunk_sz <= min_chunk_sz)
|
||||
chunk_sz = min_chunk_sz;
|
||||
}
|
||||
@@ -647,50 +647,11 @@ static sftp_file chunk_open_remote(const char *path, int flags, mode_t mode, siz
|
||||
return sf;
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO: handle case when read returns 0 (EOF).
|
||||
*/
|
||||
static int _chunk_copy_local_to_remote(struct chunk *c, int fd, sftp_file sf,
|
||||
size_t sftp_buf_sz, size_t io_buf_sz,
|
||||
size_t *counter)
|
||||
{
|
||||
ssize_t read_bytes, write_bytes, remaind;
|
||||
char buf[io_buf_sz];
|
||||
|
||||
for (remaind = c->len; remaind > 0;) {
|
||||
|
||||
read_bytes = read(fd, buf, min(remaind, io_buf_sz));
|
||||
if (read_bytes < 0) {
|
||||
pr_err("read: %s\n", strerrno());
|
||||
return -1;
|
||||
}
|
||||
|
||||
write_bytes = sftp_write2(sf, buf, read_bytes, sftp_buf_sz);
|
||||
if (write_bytes < 0) {
|
||||
pr_err("sftp_write: %d\n", sftp_get_error(sf->sftp));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (write_bytes < read_bytes) {
|
||||
pr_err("failed to write full bytes to %s\n", c->f->dst_path);
|
||||
return -1;
|
||||
}
|
||||
|
||||
*counter += write_bytes;
|
||||
remaind -= write_bytes;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#define XFER_BUF_SIZE 16384
|
||||
|
||||
#ifdef ASYNC_WRITE
|
||||
static int _chunk_copy_local_to_remote_async(struct chunk *c, int fd,
|
||||
sftp_file sf, int nr_ahead, size_t *counter)
|
||||
static int chunk_copy_local_to_remote_async(struct chunk *c, int fd, sftp_file sf,
|
||||
int nr_ahead, int buf_sz, size_t *counter)
|
||||
{
|
||||
ssize_t read_bytes, remaind, thrown;
|
||||
char buf[XFER_BUF_SIZE];
|
||||
char buf[buf_sz];
|
||||
int idx, ret;
|
||||
struct {
|
||||
uint32_t id;
|
||||
@@ -756,27 +717,19 @@ static int _chunk_copy_local_to_remote_async(struct chunk *c, int fd,
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int _chunk_copy_remote_to_local(struct chunk *c, int fd, sftp_file sf,
|
||||
int nr_ahead, size_t *counter)
|
||||
|
||||
static int chunk_copy_remote_to_local_async(struct chunk *c, int fd, sftp_file sf,
|
||||
int nr_ahead, int buf_sz, size_t *counter)
|
||||
{
|
||||
ssize_t read_bytes, write_bytes, remaind, thrown;
|
||||
char buf[XFER_BUF_SIZE];
|
||||
char buf[buf_sz];
|
||||
int idx;
|
||||
struct {
|
||||
int id;
|
||||
ssize_t len;
|
||||
} reqs[nr_ahead];
|
||||
|
||||
/* TODO: sftp_buf_sz has no effect on remote to local copy. we
|
||||
* always use 16384 byte buffer pointed by
|
||||
* https://api.libssh.org/stable/libssh_tutor_sftp.html. The
|
||||
* larget read length from sftp_async_read is 65536 byte.
|
||||
* Read sizes larget than 65536 cause a situation where data
|
||||
* remainds but sftp_async_read returns 0.
|
||||
*/
|
||||
|
||||
if (c->len == 0)
|
||||
return 0;
|
||||
|
||||
@@ -832,8 +785,7 @@ static int _chunk_copy_remote_to_local(struct chunk *c, int fd, sftp_file sf,
|
||||
}
|
||||
|
||||
static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp,
|
||||
size_t sftp_buf_sz, size_t io_buf_sz,
|
||||
int nr_ahead, size_t *counter)
|
||||
int nr_ahead, int buf_sz, size_t *counter)
|
||||
{
|
||||
struct file *f = c->f;
|
||||
sftp_file sf = NULL;
|
||||
@@ -856,12 +808,7 @@ static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp,
|
||||
goto out;
|
||||
}
|
||||
|
||||
#ifndef ASYNC_WRITE
|
||||
ret = _chunk_copy_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz,
|
||||
counter);
|
||||
#else
|
||||
ret = _chunk_copy_local_to_remote_async(c, fd, sf, nr_ahead, counter);
|
||||
#endif
|
||||
ret = chunk_copy_local_to_remote_async(c, fd, sf, nr_ahead, buf_sz, counter);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
out:
|
||||
@@ -873,7 +820,7 @@ out:
|
||||
}
|
||||
|
||||
static int chunk_copy_remote_to_local(struct chunk *c, sftp_session sftp,
|
||||
int nr_ahead, size_t *counter)
|
||||
int nr_ahead, int buf_sz, size_t *counter)
|
||||
{
|
||||
struct file *f = c->f;
|
||||
sftp_file sf = NULL;
|
||||
@@ -896,7 +843,7 @@ static int chunk_copy_remote_to_local(struct chunk *c, sftp_session sftp,
|
||||
goto out;
|
||||
}
|
||||
|
||||
ret = _chunk_copy_remote_to_local(c, fd, sf, nr_ahead, counter);
|
||||
ret = chunk_copy_remote_to_local_async(c, fd, sf, nr_ahead, buf_sz, counter);
|
||||
if (ret< 0)
|
||||
goto out;
|
||||
|
||||
@@ -929,46 +876,31 @@ static int file_cleanup(struct file *f, sftp_session sftp)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int chunk_copy(struct chunk *c, sftp_session sftp, size_t sftp_buf_sz, size_t io_buf_sz,
|
||||
int nr_ahead, size_t *counter)
|
||||
int chunk_copy(struct chunk *c, sftp_session sftp, int nr_ahead, int buf_sz,
|
||||
size_t *counter)
|
||||
{
|
||||
struct file *f = c->f;
|
||||
int ret = 0;
|
||||
|
||||
pr_debug("copy %s %s -> %s %s off=0x%010lx\n",
|
||||
f->src_path, strloc(f->src_is_remote),
|
||||
f->dst_path, strloc(f->dst_is_remote), c->off);
|
||||
|
||||
pprint4("copy start: chunk %s 0x%010lx-0x%010lx %luB\n",
|
||||
c->f->src_path, c->off, c->off + c->len, c->len);
|
||||
|
||||
|
||||
if (f->dst_is_remote)
|
||||
ret = chunk_copy_local_to_remote(c, sftp, sftp_buf_sz, io_buf_sz,
|
||||
nr_ahead, counter);
|
||||
ret = chunk_copy_local_to_remote(c, sftp, nr_ahead, buf_sz, counter);
|
||||
else
|
||||
ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, counter);
|
||||
ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, buf_sz, counter);
|
||||
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
pr_debug("done %s %s -> %s %s off=0x%010lx\n",
|
||||
f->src_path, strloc(f->src_is_remote),
|
||||
f->dst_path, strloc(f->dst_is_remote), c->off);
|
||||
|
||||
pprint4("copy done: chunk %s 0x%010lx-0x%010lx %luB\n",
|
||||
c->f->src_path, c->off, c->off + c->len, c->len);
|
||||
|
||||
if (refcnt_dec(&f->refcnt) == 0) {
|
||||
sftp_session s, d;
|
||||
mode_t mode;
|
||||
|
||||
f->state = FILE_STATE_DONE;
|
||||
pprint2("copy done: %s\n", f->src_path);
|
||||
|
||||
ret = file_cleanup(f, sftp);
|
||||
}
|
||||
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -72,8 +72,9 @@ int chunk_fill(struct list_head *file_list, struct list_head *chunk_list,
|
||||
|
||||
struct chunk *chunk_acquire(struct list_head *chunk_list);
|
||||
int chunk_prepare(struct chunk *c, sftp_session sftp);
|
||||
int chunk_copy(struct chunk *c, sftp_session sftp, size_t sftp_buf_sz, size_t io_buf_sz,
|
||||
int nr_ahead, size_t *counter);
|
||||
int chunk_copy(struct chunk *c, sftp_session sftp, int nr_ahead, int buf_sz,
|
||||
size_t *counter);
|
||||
|
||||
|
||||
#ifdef DEBUG
|
||||
void file_dump(struct list_head *file_list);
|
||||
|
||||
79
src/main.c
79
src/main.c
@@ -23,10 +23,16 @@
|
||||
#endif
|
||||
|
||||
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
|
||||
#define DEFAULT_SFTP_BUF_SZ 131072 /* derived from qemu/block/ssh.c */
|
||||
#define DEFAULT_IO_BUF_SZ DEFAULT_SFTP_BUF_SZ
|
||||
/* XXX: need to investigate max buf size for sftp_read/sftp_write */
|
||||
#define DEFAULT_NR_AHEAD 16
|
||||
#define DEFAULT_BUF_SZ 16384
|
||||
/* XXX: we use 16384 byte buffer pointed by
|
||||
* https://api.libssh.org/stable/libssh_tutor_sftp.html. The larget
|
||||
* read length from sftp_async_read is 65536 byte. Read sizes larger
|
||||
* than 65536 cause a situation where data remainds but
|
||||
* sftp_async_read returns 0.
|
||||
*/
|
||||
|
||||
|
||||
|
||||
struct mscp_thread {
|
||||
sftp_session sftp;
|
||||
@@ -50,7 +56,7 @@ struct mscp {
|
||||
char *target;
|
||||
|
||||
int nr_threads; /* number of threads */
|
||||
int sftp_buf_sz, io_buf_sz;
|
||||
int buf_sz; /* i/o buf size */
|
||||
int nr_ahead; /* # of ahead read command for remote to local copy */
|
||||
|
||||
struct mscp_thread *threads;
|
||||
@@ -77,10 +83,7 @@ void usage(bool print_help) {
|
||||
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
|
||||
"\n"
|
||||
"Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]\n"
|
||||
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]\n"
|
||||
#ifndef ASYNC_WRITE
|
||||
" [-b sftp_buf_sz] [-B io_buf_sz] \n"
|
||||
#endif
|
||||
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
|
||||
" [-l login_name] [-p port] [-i identity_file]\n"
|
||||
" [-c cipher_spec] [-M hmac_spec] source ... target\n"
|
||||
"\n");
|
||||
@@ -94,13 +97,7 @@ void usage(bool print_help) {
|
||||
" -S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)\n"
|
||||
"\n"
|
||||
" -a NR_AHEAD number of inflight SFTP commands (default: 16)\n"
|
||||
#ifndef ASYNC_WRITE
|
||||
" -b SFTP_BUF_SIZE buf size for sftp_read/write (default 131072B)\n"
|
||||
" -B IO_BUF_SIZE buf size for read/write (default 131072B)\n"
|
||||
" Note that the default value is derived from\n"
|
||||
" qemu/block/ssh.c. need investigation...\n"
|
||||
" -b and -B affect only local to remote copy\n"
|
||||
#endif
|
||||
" -b BUF_SZ buffer size for i/o and transfer\n"
|
||||
"\n"
|
||||
" -v increment verbose output level\n"
|
||||
" -q disable output\n"
|
||||
@@ -236,14 +233,12 @@ int main(int argc, char **argv)
|
||||
INIT_LIST_HEAD(&m.file_list);
|
||||
INIT_LIST_HEAD(&m.chunk_list);
|
||||
lock_init(&m.chunk_lock);
|
||||
m.sftp_buf_sz = DEFAULT_SFTP_BUF_SZ;
|
||||
m.io_buf_sz = DEFAULT_IO_BUF_SZ;
|
||||
m.nr_ahead = DEFAULT_NR_AHEAD;
|
||||
|
||||
m.buf_sz = DEFAULT_BUF_SZ;
|
||||
m.nr_threads = (int)(nr_cpus() / 2);
|
||||
m.nr_threads = m.nr_threads == 0 ? 1 : m.nr_threads;
|
||||
|
||||
while ((ch = getopt(argc, argv, "n:m:s:S:b:B:a:vqDl:p:i:c:M:CHdh")) != -1) {
|
||||
while ((ch = getopt(argc, argv, "n:m:s:S:a:b:vqDl:p:i:c:M:CHdh")) != -1) {
|
||||
switch (ch) {
|
||||
case 'n':
|
||||
m.nr_threads = atoi(optarg);
|
||||
@@ -285,20 +280,6 @@ int main(int argc, char **argv)
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
case 'b':
|
||||
m.sftp_buf_sz = atoi(optarg);
|
||||
if (m.sftp_buf_sz < 1) {
|
||||
pr_err("invalid buffer size: %s\n", optarg);
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
case 'B':
|
||||
m.io_buf_sz = atoi(optarg);
|
||||
if (m.io_buf_sz < 1) {
|
||||
pr_err("invalid buffer size: %s\n", optarg);
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
case 'a':
|
||||
m.nr_ahead = atoi(optarg);
|
||||
if (m.nr_ahead < 1) {
|
||||
@@ -306,6 +287,13 @@ int main(int argc, char **argv)
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
case 'b':
|
||||
m.buf_sz = atoi(optarg);
|
||||
if (m.buf_sz < 1) {
|
||||
pr_err("invalid buffer size: %s\n", optarg);
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
case 'v':
|
||||
verbose++;
|
||||
break;
|
||||
@@ -406,6 +394,10 @@ int main(int argc, char **argv)
|
||||
chunk_dump(&m.chunk_list);
|
||||
#endif
|
||||
|
||||
/* close the first sftp/ssh session */
|
||||
ssh_sftp_close(m.ctrl);
|
||||
m.ctrl = NULL;
|
||||
|
||||
if (dryrun)
|
||||
return 0;
|
||||
|
||||
@@ -429,15 +421,14 @@ int main(int argc, char **argv)
|
||||
t->sftp = ssh_init_sftp_session(m.host, m.opts);
|
||||
if (!t->sftp) {
|
||||
ret = 1;
|
||||
goto join_out;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
/* init mscp stat for printing progress bar */
|
||||
if (mscp_stat_init() < 0) {
|
||||
stop_copy_threads(0);
|
||||
ret = 1;
|
||||
goto join_out;
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* register SIGINT to stop threads */
|
||||
@@ -514,8 +505,7 @@ void *mscp_copy_thread(void *arg)
|
||||
if ((t->ret = chunk_prepare(c, sftp)) < 0)
|
||||
break;
|
||||
|
||||
if ((t->ret = chunk_copy(c, sftp, m.sftp_buf_sz, m.io_buf_sz,
|
||||
m.nr_ahead, &t->done)) < 0)
|
||||
if ((t->ret = chunk_copy(c, sftp, m.nr_ahead, m.buf_sz, &t->done)) < 0)
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -531,7 +521,7 @@ void *mscp_copy_thread(void *arg)
|
||||
|
||||
/* progress bar-related functions */
|
||||
|
||||
static double calculate_timedelta(struct timeval *b, struct timeval *a)
|
||||
double calculate_timedelta(struct timeval *b, struct timeval *a)
|
||||
{
|
||||
double sec, usec;
|
||||
|
||||
@@ -547,13 +537,12 @@ static double calculate_timedelta(struct timeval *b, struct timeval *a)
|
||||
return sec;
|
||||
}
|
||||
|
||||
static double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
|
||||
double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
|
||||
{
|
||||
return (double)diff / calculate_timedelta(b, a);
|
||||
}
|
||||
|
||||
static char *calculate_eta(size_t remain, size_t diff,
|
||||
struct timeval *b, struct timeval *a)
|
||||
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a)
|
||||
{
|
||||
static char buf[16];
|
||||
double elapsed = calculate_timedelta(b, a);
|
||||
@@ -569,7 +558,7 @@ static char *calculate_eta(size_t remain, size_t diff,
|
||||
return buf;
|
||||
}
|
||||
|
||||
static void print_progress_bar(double percent, char *suffix)
|
||||
void print_progress_bar(double percent, char *suffix)
|
||||
{
|
||||
int n, thresh, bar_width;
|
||||
struct winsize ws;
|
||||
@@ -605,8 +594,8 @@ static void print_progress_bar(double percent, char *suffix)
|
||||
pprint1("%s%s", buf, suffix);
|
||||
}
|
||||
|
||||
static void print_progress(struct timeval *b, struct timeval *a,
|
||||
size_t total, size_t last, size_t done)
|
||||
void print_progress(struct timeval *b, struct timeval *a,
|
||||
size_t total, size_t last, size_t done)
|
||||
{
|
||||
char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" };
|
||||
char *byte_units[] = { "B ", "KB", "MB", "GB", "TB", "PB" };
|
||||
|
||||
31
src/ssh.c
31
src/ssh.c
@@ -268,7 +268,7 @@ static int ssh_verify_known_hosts(ssh_session session)
|
||||
|
||||
case SSH_KNOWN_HOSTS_UNKNOWN:
|
||||
hexa = ssh_get_hexa(hash, hlen);
|
||||
fprintf(stderr,"The server is unknown. Do you trust the host key?\n");
|
||||
fprintf(stderr, "The server is unknown. Do you trust the host key?\n");
|
||||
fprintf(stderr, "Public key hash: %s\n", hexa);
|
||||
fprintf(stderr, "(yes/no): ");
|
||||
ssh_string_free_char(hexa);
|
||||
@@ -310,32 +310,3 @@ void ssh_sftp_close(sftp_session sftp)
|
||||
ssh_disconnect(ssh);
|
||||
ssh_free(ssh);
|
||||
}
|
||||
|
||||
|
||||
ssize_t sftp_write2(sftp_file sf, const void *buf, size_t len, size_t sftp_buf_sz)
|
||||
{
|
||||
ssize_t ret, nbytes;
|
||||
|
||||
for (nbytes = 0; nbytes < len;) {
|
||||
ret = sftp_write(sf, buf + nbytes,
|
||||
min(len - nbytes, sftp_buf_sz));
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
nbytes += ret;
|
||||
}
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
ssize_t sftp_read2(sftp_file sf, void *buf, size_t len, size_t sftp_buf_sz)
|
||||
{
|
||||
ssize_t ret, nbytes;
|
||||
|
||||
for (nbytes = 0; nbytes < len;) {
|
||||
ret = sftp_read(sf, buf + nbytes,
|
||||
min(len - nbytes, sftp_buf_sz));
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
nbytes += ret;
|
||||
}
|
||||
return nbytes;
|
||||
}
|
||||
|
||||
@@ -30,8 +30,4 @@ void ssh_sftp_close(sftp_session sftp);
|
||||
#define sftp_ssh(sftp) (sftp)->session
|
||||
#define sftp_get_ssh_error(sftp) ssh_get_error(sftp_ssh(sftp))
|
||||
|
||||
/* wrapping multiple sftp_read|write */
|
||||
ssize_t sftp_write2(sftp_file sf, const void *buf, size_t len, size_t sftp_buf_sz);
|
||||
ssize_t sftp_read2(sftp_file sf, void *buf, size_t len, size_t sftp_buf_sz);
|
||||
|
||||
#endif /* _SSH_H_ */
|
||||
|
||||
Reference in New Issue
Block a user