15 Commits

Author SHA1 Message Date
Ryo Nakamura
9611b4d077 bump version to 0.1.2 2023-11-01 19:54:18 +09:00
Ryo Nakamura
2b9061f5f0 add --sysctl net.ipv6.conf.all.disable_ipv6=0 for docker run.
docker disables ipv6 on all interfaces inside containers by default,
even ::1 on lo. It causes testing mscp with IPv6 fails. Thus,
this commit disables disable_ipv6 via the --sysctl option.
2023-11-01 19:54:18 +09:00
Ryo Nakamura
8e590da322 fix parsing user@host:path.
This commit fixes issue #6. Now mscp command correctly parses
[x::x] IPv6 address notation in hostname.
2023-11-01 19:54:18 +09:00
Ryo Nakamura
b298b2ec35 main: adopt rolling average of recent eight bps values to calculate ETA 2023-11-01 19:54:18 +09:00
Ryo Nakamura
05a7e96759 main: call only mscp_stop() when receives sigint 2023-11-01 19:54:18 +09:00
Ryo Nakamura
139ba12f1a write total transferred bytes and number of files
at the end of output when serverity is notice.
2023-11-01 19:54:18 +09:00
Ryo Nakamura
cfbadebe6d change msg: thread[%d] to thread:%d 2023-11-01 19:54:18 +09:00
Ryo Nakamura
d7365683a9 print 1st decimal point in the progress bar 2023-11-01 19:54:18 +09:00
Ryo Nakamura
53a560b130 fix test_e2e for ccalgo and tiny fix on test_dir_copy_single 2023-11-01 19:54:18 +09:00
Ryo Nakamura
bf74aa095a add -g option to specify TCP cc algorithm
This commit introduce SSH_OPTIONS_CCALGO option to the libssh patch
and add -g CONGESTION option to mscp.
2023-11-01 19:54:18 +09:00
Ryo Nakamura
a88471fc43 Update README.md
add link to PEARC'23 paper
2023-09-11 19:56:33 +09:00
Ryo Nakamura
89e50453a8 bump version to 0.1.1 2023-09-08 17:28:36 +09:00
Ryo Nakamura
bc1cf11cc1 enable github actions on the dev branch 2023-09-08 17:20:45 +09:00
Ryo Nakamura
72841ec12d fix: use off_t for lseek 2023-09-08 17:19:13 +09:00
Ryo Nakamura
19704a7308 Update README.md
replace the demo mp4.
2023-09-07 15:38:45 +09:00
17 changed files with 439 additions and 122 deletions

View File

@@ -2,9 +2,9 @@ name: build on macOS
on:
push:
branches: [ "main" ]
branches: [ "main", "dev" ]
pull_request:
branches: [ "main" ]
branches: [ "main", "dev" ]
env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)

View File

@@ -2,9 +2,9 @@ name: build on ubuntu
on:
push:
branches: [ "main" ]
branches: [ "main", "dev" ]
pull_request:
branches: [ "main" ]
branches: [ "main", "dev" ]
env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)

View File

@@ -13,10 +13,10 @@ name: "CodeQL"
on:
push:
branches: [ "main" ]
branches: [ "main", "dev" ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ "main" ]
branches: [ "main", "dev" ]
schedule:
- cron: '35 11 * * 5'

View File

@@ -2,9 +2,9 @@ name: test
on:
push:
branches: [ "main" ]
branches: [ "main", "dev" ]
pull_request:
branches: [ "main" ]
branches: [ "main", "dev" ]
env:
BUILD_TYPE: Release

View File

@@ -221,7 +221,8 @@ foreach(x RANGE ${DIST_LISTLEN})
COMMENT "Test mscp in ${DOCKER_IMAGE} container"
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
COMMAND
docker run --init --rm ${DOCKER_IMAGE} /mscp/scripts/test-in-container.sh)
docker run --init --rm --sysctl net.ipv6.conf.all.disable_ipv6=0
${DOCKER_IMAGE} /mscp/scripts/test-in-container.sh)
add_custom_target(docker-pkg-${DOCKER_INDEX}
COMMENT "Retrieve mscp package from ${DOCKER_IMAGE} container"

View File

@@ -15,7 +15,8 @@ standard `sshd` supporting the SFTP subsystem (e.g. openssh-server),
and you need to be able to ssh to the hosts as usual. `mscp` does not
require anything else.
https://user-images.githubusercontent.com/184632/206889149-7cc6178a-6f0f-41e6-855c-d25e15a9abc5.mp4
https://github.com/upa/mscp/assets/184632/19230f57-be7f-4ef0-98dd-cb4c460f570d
--------------------------------------------------------------------
@@ -25,7 +26,8 @@ Differences from `scp` on usage:
- `-r` option is not needed to transfer directories.
- and any other differences I have not implemented and noticed.
Paper:
- Ryo Nakamura and Yohei Kuga. 2023. Multi-threaded scp: Easy and Fast File Transfer over SSH. In Practice and Experience in Advanced Research Computing (PEARC '23). Association for Computing Machinery, New York, NY, USA, 320323. https://doi.org/10.1145/3569951.3597582
## Install

View File

@@ -1 +1 @@
0.1.0
0.1.2

View File

@@ -56,6 +56,7 @@ struct mscp_opts {
#define MSCP_SSH_MAX_CIPHER_STR 32
#define MSCP_SSH_MAX_HMAC_STR 32
#define MSCP_SSH_MAX_COMP_STR 32 /* yes, no, zlib, zlib@openssh.com, none */
#define MSCP_SSH_MAX_CCALGO_STR 16
#define MSCP_SSH_MAX_PASSWORD 128
#define MSCP_SSH_MAX_PASSPHRASE 128
@@ -72,6 +73,7 @@ struct mscp_ssh_opts {
char cipher[MSCP_SSH_MAX_CIPHER_STR]; /** cipher spec */
char hmac[MSCP_SSH_MAX_HMAC_STR]; /** hmacp spec */
char compress[MSCP_SSH_MAX_COMP_STR]; /** yes, no, zlib@openssh.com */
char ccalgo[MSCP_SSH_MAX_CCALGO_STR]; /** TCP cc algorithm */
char password[MSCP_SSH_MAX_PASSWORD]; /** password auth passowrd */
char passphrase[MSCP_SSH_MAX_PASSPHRASE]; /** passphrase for private key */

View File

@@ -1,3 +1,28 @@
diff --git a/ConfigureChecks.cmake b/ConfigureChecks.cmake
index 7103f303..c64eb39d 100644
--- a/ConfigureChecks.cmake
+++ b/ConfigureChecks.cmake
@@ -258,6 +258,7 @@ if (UNIX)
check_library_exists(util forkpty "" HAVE_LIBUTIL)
check_function_exists(cfmakeraw HAVE_CFMAKERAW)
check_function_exists(__strtoull HAVE___STRTOULL)
+ check_symbol_exists(TCP_CONGESTION "netinet/tcp.h" HAVE_TCP_CONGESTION)
endif (UNIX)
set(LIBSSH_REQUIRED_LIBRARIES ${_REQUIRED_LIBRARIES} CACHE INTERNAL "libssh required system libraries")
diff --git a/config.h.cmake b/config.h.cmake
index 1357615b..1e915ead 100644
--- a/config.h.cmake
+++ b/config.h.cmake
@@ -237,6 +237,8 @@
#cmakedefine HAVE_GCC_BOUNDED_ATTRIBUTE 1
+#cmakedefine HAVE_TCP_CONGESTION 1
+
/* Define to 1 if you want to enable GSSAPI */
#cmakedefine WITH_GSSAPI 1
diff --git a/include/libssh/buffer.h b/include/libssh/buffer.h
index a55a1b40..e34e075c 100644
--- a/include/libssh/buffer.h
@@ -12,10 +37,18 @@ index a55a1b40..e34e075c 100644
int ssh_buffer_validate_length(struct ssh_buffer_struct *buffer, size_t len);
diff --git a/include/libssh/libssh.h b/include/libssh/libssh.h
index 7857a77b..3eef7a16 100644
index 7857a77b..6b4d481c 100644
--- a/include/libssh/libssh.h
+++ b/include/libssh/libssh.h
@@ -833,6 +833,7 @@ LIBSSH_API const char* ssh_get_hmac_in(ssh_session session);
@@ -402,6 +402,7 @@ enum ssh_options_e {
SSH_OPTIONS_GSSAPI_AUTH,
SSH_OPTIONS_GLOBAL_KNOWNHOSTS,
SSH_OPTIONS_NODELAY,
+ SSH_OPTIONS_CCALGO,
SSH_OPTIONS_PUBLICKEY_ACCEPTED_TYPES,
SSH_OPTIONS_PROCESS_CONFIG,
SSH_OPTIONS_REKEY_DATA,
@@ -833,6 +834,7 @@ LIBSSH_API const char* ssh_get_hmac_in(ssh_session session);
LIBSSH_API const char* ssh_get_hmac_out(ssh_session session);
LIBSSH_API ssh_buffer ssh_buffer_new(void);
@@ -23,7 +56,7 @@ index 7857a77b..3eef7a16 100644
LIBSSH_API void ssh_buffer_free(ssh_buffer buffer);
#define SSH_BUFFER_FREE(x) \
do { if ((x) != NULL) { ssh_buffer_free(x); x = NULL; } } while(0)
@@ -843,6 +844,8 @@ LIBSSH_API void *ssh_buffer_get(ssh_buffer buffer);
@@ -843,6 +845,8 @@ LIBSSH_API void *ssh_buffer_get(ssh_buffer buffer);
LIBSSH_API uint32_t ssh_buffer_get_len(ssh_buffer buffer);
LIBSSH_API int ssh_session_set_disconnect_message(ssh_session session, const char *message);
@@ -32,6 +65,18 @@ index 7857a77b..3eef7a16 100644
#ifndef LIBSSH_LEGACY_0_4
#include "libssh/legacy.h"
#endif
diff --git a/include/libssh/session.h b/include/libssh/session.h
index d3e5787c..15183d1b 100644
--- a/include/libssh/session.h
+++ b/include/libssh/session.h
@@ -232,6 +232,7 @@ struct ssh_session_struct {
int gss_delegate_creds;
int flags;
int nodelay;
+ char *ccalgo;
bool config_processed;
uint8_t options_seen[SOC_MAX];
uint64_t rekey_data;
diff --git a/include/libssh/sftp.h b/include/libssh/sftp.h
index c855df8a..0fcdb9b8 100644
--- a/include/libssh/sftp.h
@@ -158,6 +203,106 @@ index e0068015..cc0caf35 100644
/**
* @brief Ensure the buffer has at least a certain preallocated size.
*
diff --git a/src/connect.c b/src/connect.c
index 57e37e63..c02397d5 100644
--- a/src/connect.c
+++ b/src/connect.c
@@ -156,6 +156,20 @@ static int set_tcp_nodelay(socket_t socket)
sizeof(opt));
}
+static int set_tcp_ccalgo(socket_t socket, const char *ccalgo)
+{
+#ifdef HAVE_TCP_CONGESTION
+ return setsockopt(socket,
+ IPPROTO_TCP,
+ TCP_CONGESTION,
+ (void *)ccalgo,
+ strlen(ccalgo));
+#else
+ errno = ENOTSUP;
+ return -1;
+#endif
+}
+
/**
* @internal
*
@@ -256,6 +270,18 @@ socket_t ssh_connect_host_nonblocking(ssh_session session, const char *host,
}
}
+ if (session->opts.ccalgo) {
+ rc = set_tcp_ccalgo(s, session->opts.ccalgo);
+ if (rc < 0) {
+ ssh_set_error(session, SSH_FATAL,
+ "Failed to set TCP_CONGESTION on socket: %s",
+ ssh_strerror(errno, err_msg, SSH_ERRNO_MSG_MAX));
+ ssh_connect_socket_close(s);
+ s = -1;
+ continue;
+ }
+ }
+
errno = 0;
rc = connect(s, itr->ai_addr, itr->ai_addrlen);
if (rc == -1 && (errno != 0) && (errno != EINPROGRESS)) {
diff --git a/src/options.c b/src/options.c
index 49aaefa2..9f7360c3 100644
--- a/src/options.c
+++ b/src/options.c
@@ -210,6 +210,7 @@ int ssh_options_copy(ssh_session src, ssh_session *dest)
new->opts.gss_delegate_creds = src->opts.gss_delegate_creds;
new->opts.flags = src->opts.flags;
new->opts.nodelay = src->opts.nodelay;
+ new->opts.ccalgo = src->opts.ccalgo;
new->opts.config_processed = src->opts.config_processed;
new->common.log_verbosity = src->common.log_verbosity;
new->common.callbacks = src->common.callbacks;
@@ -450,6 +451,10 @@ int ssh_options_set_algo(ssh_session session,
* Set it to disable Nagle's Algorithm (TCP_NODELAY) on the
* session socket. (int, 0=false)
*
+ * - SSH_OPTIONS_CCALGO
+ * Set it to specify TCP congestion control algorithm on the
+ * session socket (Linux only). (int, 0=false)
+ *
* - SSH_OPTIONS_PROCESS_CONFIG
* Set it to false to disable automatic processing of per-user
* and system-wide OpenSSH configuration files. LibSSH
@@ -1013,6 +1018,20 @@ int ssh_options_set(ssh_session session, enum ssh_options_e type,
session->opts.nodelay = (*x & 0xff) > 0 ? 1 : 0;
}
break;
+ case SSH_OPTIONS_CCALGO:
+ v = value;
+ if (v == NULL || v[0] == '\0') {
+ ssh_set_error_invalid(session);
+ return -1;
+ } else {
+ SAFE_FREE(session->opts.ccalgo);
+ session->opts.ccalgo = strdup(v);
+ if (session->opts.ccalgo == NULL) {
+ ssh_set_error_oom(session);
+ return -1;
+ }
+ }
+ break;
case SSH_OPTIONS_PROCESS_CONFIG:
if (value == NULL) {
ssh_set_error_invalid(session);
diff --git a/src/session.c b/src/session.c
index 6025c133..6b197526 100644
--- a/src/session.c
+++ b/src/session.c
@@ -108,6 +108,7 @@ ssh_session ssh_new(void)
session->opts.fd = -1;
session->opts.compressionlevel = 7;
session->opts.nodelay = 0;
+ session->opts.ccalgo = NULL;
session->opts.flags = SSH_OPT_FLAG_PASSWORD_AUTH |
SSH_OPT_FLAG_PUBKEY_AUTH |
diff --git a/src/sftp.c b/src/sftp.c
index e01012a8..702623a0 100644
--- a/src/sftp.c

View File

@@ -278,9 +278,9 @@ void mscp_close(mf *f)
free(f);
}
int mscp_lseek(mf *f, size_t off)
off_t mscp_lseek(mf *f, off_t off)
{
int ret;
off_t ret;
if (f->remote) {
ret = sftp_seek64(f->remote, off);

View File

@@ -45,7 +45,7 @@ typedef struct mf_struct mf;
mf *mscp_open(const char *path, int flags, mode_t mode, sftp_session sftp);
void mscp_close(mf *f);
int mscp_lseek(mf *f, size_t off);
off_t mscp_lseek(mf *f, off_t off);
/* mscp_setstat() involves chmod and truncate. It executes both at
* once via a single SFTP command (sftp_setstat()).

View File

@@ -21,7 +21,8 @@ void usage(bool print_help) {
"Usage: mscp [vqDHdNh] [-n nr_conns] [-m coremask] [-u max_startups]\n"
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
" [-l login_name] [-p port] [-F ssh_config] [-i identity_file]\n"
" [-c cipher_spec] [-M hmac_spec] [-C compress] source ... target\n"
" [-c cipher_spec] [-M hmac_spec] [-C compress] [-g congestion]\n"
" source ... target\n"
"\n");
if (!print_help)
@@ -51,6 +52,7 @@ void usage(bool print_help) {
" -M HMAC hmac spec\n"
" -C COMPRESS enable compression: "
"yes, no, zlib, zlib@openssh.com\n"
" -g CONGESTION specify TCP congestion control algorithm\n"
" -H disable hostkey check\n"
" -d increment ssh debug output level\n"
" -N enable Nagle's algorithm (default disabled)\n"
@@ -58,58 +60,113 @@ void usage(bool print_help) {
"\n");
}
char *split_remote_and_path(const char *string, char **remote, char **path)
char *strip_brackets(char *s)
{
char *s, *p;
/* split user@host:path into user@host, and path.
* return value is strdup()ed memory (for free()).
*/
if (!(s = strdup(string))) {
fprintf(stderr, "strdup: %s\n", strerror(errno));
return NULL;
if (s[0] == '[' && s[strlen(s) - 1] == ']') {
s[strlen(s) - 1] = '\0';
return s + 1;
}
if ((p = strchr(s, ':'))) {
if (p == s || ((p > s) && *(p - 1) == '\\')) {
/* first byte is colon, or escaped colon. no user@host here */
goto no_remote;
} else {
/* we found ':', so this is remote:path notation. split it */
*p = '\0';
*remote = s;
*path = p + 1;
return s;
}
}
no_remote:
*remote = NULL;
*path = s;
return s;
}
char *split_user_host_path(const char *s, char **userp, char **hostp, char **pathp)
{
char *tmp, *cp, *user = NULL, *host = NULL, *path = NULL;
bool inbrackets = false;
if (!(tmp = strdup(s))) {
fprintf(stderr, "stdrup: %s\n", strerror(errno));
return NULL;
}
user = NULL;
host = NULL;
path = tmp;
for (cp = tmp; *cp; cp++) {
if (*cp == '@' && (cp > tmp) && *(cp - 1) != '\\' && user == NULL) {
/* cp is non-escaped '@', so this '@' is the
* delimitater between username and host. */
*cp = '\0';
user = tmp;
host = cp + 1;
}
if (*cp == '[')
inbrackets = true;
if (*cp == ']')
inbrackets = false;
if (*cp == ':' && (cp > tmp) && *(cp - 1) != '\\') {
if (!inbrackets) {
/* cp is non-escaped ':' and not in
* brackets for IPv6 address
* notation. So, this ':' is the
* delimitater between host and
* path. */
*cp = '\0';
host = host == NULL ? tmp : host;
path = cp + 1;
break;
}
}
}
*userp = user;
*hostp = host ? strip_brackets(host) : NULL;
*pathp = path;
return tmp;
}
struct target {
char *remote;
char *copy;
char *user;
char *host;
char *path;
};
int compare_remote(struct target *a, struct target *b)
{
/* return 0 if a and b have the identical user@host, otherwise 1 */
int alen, blen;
if (a->user) {
if (!b->user)
return 1;
alen = strlen(a->user);
blen = strlen(b->user);
if (alen != blen)
return 1;
if (strncmp(a->user, b->user, alen) != 0)
return 1;
} else if (b->user)
return 1;
if (a->host) {
if (!b->host)
return 1;
alen = strlen(a->host);
blen = strlen(b->host);
if (alen != blen)
return 1;
if (strncmp(a->host, b->host, alen) != 0)
return 1;
} else if (b->host)
return 1;
return 0;
}
struct target *validate_targets(char **arg, int len)
{
/* arg is array of source ... destination.
* There are two cases:
*
* 1. remote:path remote:path ... path, remote to local copy
* 2. path path ... remote:path, local to remote copy.
* 1. user@host:path host:path ... path, remote to local copy
* 2. path path ... host:path, local to remote copy.
*
* This function split (remote:)path args into struct target,
* This function split user@remote:path args into struct target,
* and validate all remotes are identical (mscp does not support
* remote to remote copy).
*/
struct target *t;
char *r;
struct target *t, *t0;
int n;
if ((t = calloc(len, sizeof(struct target))) == NULL) {
@@ -120,33 +177,28 @@ struct target *validate_targets(char **arg, int len)
/* split remote:path into remote and path */
for (n = 0; n < len; n++) {
if (split_remote_and_path(arg[n], &t[n].remote, &t[n].path) == NULL)
t[n].copy = split_user_host_path(arg[n], &t[n].user,
&t[n].host, &t[n].path);
if (!t[n].copy)
goto free_target_out;
}
/* check all remote are identical. t[len - 1] is destination,
/* check all user@host are identical. t[len - 1] is destination,
* so we need to check t[0] to t[len - 2] having the identical
* remote */
r = t[0].remote;
* remote notation */
t0 = &t[0];
for (n = 1; n < len - 1; n++) {
if (!r && t[n].remote) {
if (compare_remote(t0, &t[n]) != 0)
goto invalid_remotes;
}
if (r) {
if (!t[n].remote ||
strlen(r) != strlen(t[n].remote) ||
strcmp(r, t[n].remote) != 0)
goto invalid_remotes;
}
}
/* check inconsistent remote position in args */
if (t[0].remote == NULL && t[len - 1].remote == NULL) {
if (t[0].host == NULL && t[len - 1].host == NULL) {
fprintf(stderr, "no remote host given\n");
goto free_split_out;
}
if (t[0].remote != NULL && t[len - 1].remote != NULL) {
if (t[0].host != NULL && t[len - 1].host != NULL) {
fprintf(stderr, "no local path given\n");
goto free_split_out;
}
@@ -154,11 +206,11 @@ struct target *validate_targets(char **arg, int len)
return t;
invalid_remotes:
fprintf(stderr, "specified remote host invalid\n");
fprintf(stderr, "invalid remote host notation\n");
free_split_out:
for (n = 0; n < len; n++)
t[n].remote ? free(t[n].remote) : free(t[n].path);
if (t[n].copy) free(t[n].copy);
free_target_out:
free(t);
@@ -171,8 +223,6 @@ pthread_t tid_stat = 0;
void sigint_handler(int sig)
{
if (tid_stat)
pthread_cancel(tid_stat);
mscp_stop(m);
}
@@ -202,7 +252,8 @@ int main(int argc, char **argv)
memset(&o, 0, sizeof(o));
o.severity = MSCP_SEVERITY_WARN;
while ((ch = getopt(argc, argv, "n:m:u:s:S:a:b:vqDrl:p:i:F:c:M:C:HdNh")) != -1) {
while ((ch = getopt(argc, argv,
"n:m:u:s:S:a:b:vqDrl:p:i:F:c:M:C:g:HdNh")) != -1) {
switch (ch) {
case 'n':
o.nr_threads = atoi(optarg);
@@ -287,6 +338,13 @@ int main(int argc, char **argv)
}
strncpy(s.compress, optarg, MSCP_SSH_MAX_COMP_STR);
break;
case 'g':
if (strlen(optarg) > MSCP_SSH_MAX_CCALGO_STR - 1) {
fprintf(stderr, "long ccalgo string: %s\n", optarg);
return -1;
}
strncpy(s.ccalgo, optarg, MSCP_SSH_MAX_CCALGO_STR);
break;
case 'H':
s.no_hostkey_check = true;
break;
@@ -315,14 +373,19 @@ int main(int argc, char **argv)
if ((t = validate_targets(argv + optind, i)) == NULL)
return -1;
if (t[0].remote) {
if (t[0].host) {
/* copy remote to local */
direction = MSCP_DIRECTION_R2L;
remote = t[0].remote;
remote = t[0].host;
if (t[0].user != NULL && s.login_name[0] == '\0')
strncpy(s.login_name, t[0].user, MSCP_SSH_MAX_LOGIN_NAME - 1);
} else {
/* copy local to remote */
direction = MSCP_DIRECTION_L2R;
remote = t[i - 1].remote;
remote = t[i - 1].host;
if (t[i - 1].user != NULL && s.login_name[0] == '\0')
strncpy(s.login_name, t[i - 1].user,
MSCP_SSH_MAX_LOGIN_NAME - 1);
}
if (!dryrun) {
@@ -419,23 +482,43 @@ double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
return (double)diff / calculate_timedelta(b, a);
}
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a,
bool final)
{
static char buf[16];
double elapsed = calculate_timedelta(b, a);
double eta;
if (diff == 0)
#define bps_window_size 16
static double bps_window[bps_window_size];
static size_t sum, idx, count;
double elapsed = calculate_timedelta(b, a);
double bps = diff / elapsed;
double avg, eta;
/* early return when diff == 0 (stalled) or final output */
if (diff == 0) {
snprintf(buf, sizeof(buf), "--:-- ETA");
else if (final) {
return buf;
}
if (final) {
snprintf(buf, sizeof(buf), "%02d:%02d ",
(int)(floor(elapsed / 60)), (int)round(elapsed) % 60);
} else {
eta = remain / (diff / elapsed);
snprintf(buf, sizeof(buf), "%02d:%02d ETA",
(int)floor(eta / 60), (int)round(eta) % 60);
}
return buf;
}
/* drop the old bps value and add the recent one */
sum -= bps_window[idx];
bps_window[idx] = bps;
sum += bps_window[idx];
idx = (idx + 1) % bps_window_size;
count++;
/* calcuate ETA from avg of recent bps values */
avg = sum / min(count, bps_window_size);
eta = remain / avg;
snprintf(buf, sizeof(buf), "%02d:%02d ETA",
(int)floor(eta / 60), (int)round(eta) % 60);
return buf;
}
@@ -482,7 +565,7 @@ void print_progress(struct timeval *b, struct timeval *a,
char *byte_units[] = { "B ", "KB", "MB", "GB", "TB", "PB" };
char suffix[128];
int bps_u, byte_tu, byte_du;
size_t total_round, done_round;
double total_round, done_round;
int percent;
double bps;
@@ -505,11 +588,11 @@ void print_progress(struct timeval *b, struct timeval *a,
percent = floor(((double)(done) / (double)total) * 100);
done_round = done;
for (byte_du = 0; done_round > 1000 && byte_du < array_size(byte_units) - 1;
for (byte_du = 0; done_round > 1024 && byte_du < array_size(byte_units) - 1;
byte_du++)
done_round /= 1024;
snprintf(suffix, sizeof(suffix), "%4lu%s/%lu%s %6.1f%s %s",
snprintf(suffix, sizeof(suffix), "%4.1lf%s/%.1lf%s %6.1f%s %s",
done_round, byte_units[byte_du], total_round, byte_units[byte_tu],
bps, bps_units[bps_u],
calculate_eta(total - done, done - last, b, a, final));
@@ -526,17 +609,42 @@ struct xfer_stat {
};
struct xfer_stat x;
void print_stat_thread_cleanup(void *arg)
void print_stat(bool final)
{
struct pollfd pfd = { .fd = msg_fd, .events = POLLIN };
struct mscp_stats s;
char buf[8192];
int timeout;
if (poll(&pfd, 1, !final ? 100 : 0) < 0) {
fprintf(stderr, "poll: %s\n", strerror(errno));
return;
}
if (pfd.revents & POLLIN) {
memset(buf, 0, sizeof(buf));
if (read(msg_fd, buf, sizeof(buf)) < 0) {
fprintf(stderr, "read: %s\n", strerror(errno));
return;
}
print_cli("\r\033[K" "%s", buf);
}
gettimeofday(&x.after, NULL);
mscp_get_stats(m, &s);
x.total = s.total;
x.done = s.done;
if (calculate_timedelta(&x.before, &x.after) > 1 || final) {
mscp_get_stats(m, &s);
x.total = s.total;
x.done = s.done;
print_progress(!final ? &x.before : &x.start, &x.after,
x.total, !final ? x.last : 0, x.done, final);
x.before = x.after;
x.last = x.done;
}
}
/* print progress from the beginning */
print_progress(&x.start, &x.after, x.total, 0, x.done, true);
void print_stat_thread_cleanup(void *arg)
{
print_stat(true);
print_cli("\n"); /* final output */
}
@@ -555,31 +663,7 @@ void *print_stat_thread(void *arg)
pthread_cleanup_push(print_stat_thread_cleanup, NULL);
while (true) {
if (poll(&pfd, 1, 100) < 0) {
fprintf(stderr, "poll: %s\n", strerror(errno));
return NULL;
}
if (pfd.revents & POLLIN) {
memset(buf, 0, sizeof(buf));
if (read(msg_fd, buf, sizeof(buf)) < 0) {
fprintf(stderr, "read: %s\n", strerror(errno));
return NULL;
}
print_cli("\r\033[K" "%s", buf);
}
gettimeofday(&x.after, NULL);
if (calculate_timedelta(&x.before, &x.after) > 1) {
mscp_get_stats(m, &s);
x.total = s.total;
x.done = s.done;
print_progress(&x.before, &x.after, x.total, x.last, x.done,
false);
x.before = x.after;
x.last = x.done;
}
print_stat(false);
}
pthread_cleanup_pop(1);

View File

@@ -554,6 +554,8 @@ int mscp_start(struct mscp *m)
int mscp_join(struct mscp *m)
{
struct mscp_thread *t;
struct path *p;
size_t done = 0, nr_copied = 0, nr_tobe_copied = 0;
int n, ret = 0;
/* waiting for scan thread joins... */
@@ -563,6 +565,7 @@ int mscp_join(struct mscp *m)
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
list_for_each_entry(t, &m->thread_list, list) {
pthread_join(t->tid, NULL);
done += t->done;
if (t->ret < 0)
ret = t->ret;
if (t->sftp) {
@@ -577,6 +580,17 @@ int mscp_join(struct mscp *m)
m->first = NULL;
}
/* count up number of transferred files */
list_for_each_entry(p, &m->path_list, list) {
nr_tobe_copied++;
if (p->state == FILE_STATE_DONE) {
nr_copied++;
}
}
mpr_notice(m->msg_fp, "%lu/%lu bytes copied for %lu/%lu files\n",
done, m->total_bytes, nr_copied, nr_tobe_copied);
return ret;
}
@@ -608,7 +622,7 @@ void *mscp_copy_thread(void *arg)
goto err_out;
}
mpr_notice(m->msg_fp, "connecting to %s for a copy thread[%d]...\n",
mpr_notice(m->msg_fp, "connecting to %s for copy thread:%d...\n",
m->remote, t->id);
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
@@ -619,7 +633,7 @@ void *mscp_copy_thread(void *arg)
}
if (!t->sftp) {
mpr_err(m->msg_fp, "copy thread[%d]: %s\n", t->id, mscp_get_error());
mpr_err(m->msg_fp, "copy thread:%d: %s\n", t->id, mscp_get_error());
goto err_out;
}

View File

@@ -109,6 +109,7 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
"cipher", /* const char * */
"hmac", /* const char * */
"compress", /* const char * */
"ccalgo", /* const char * */
"password", /* const char * */
"passphrase", /* const char * */
@@ -117,10 +118,10 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
"enable_nagle", /* bool */
NULL,
};
const char *fmt = "si" "|" "ii" "kkk" "s" "iii" "ssss" "sssss" "ipp";
const char *fmt = "si" "|" "ii" "kkk" "s" "iii" "ssss" "ssssss" "ipp";
char *coremask = NULL;
char *login_name = NULL, *port = NULL, *config = NULL, *identity = NULL;
char *cipher = NULL, *hmac = NULL, *compress = NULL;
char *cipher = NULL, *hmac = NULL, *compress = NULL, *ccalgo = NULL;
char *password = NULL, *passphrase = NULL;
struct instance *i;
@@ -154,6 +155,7 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
&cipher,
&hmac,
&compress,
&ccalgo,
&password,
&passphrase,
&i->so.debug_level,
@@ -179,6 +181,8 @@ static PyObject *wrap_mscp_init(PyObject *self, PyObject *args, PyObject *kw)
strncpy(i->so.hmac, hmac, MSCP_SSH_MAX_HMAC_STR - 1);
if (compress)
strncpy(i->so.compress, compress, MSCP_SSH_MAX_COMP_STR - 1);
if (ccalgo)
strncpy(i->so.ccalgo, ccalgo, MSCP_SSH_MAX_CCALGO_STR - 1);
if (password)
strncpy(i->so.password, password, MSCP_SSH_MAX_PASSWORD - 1);
if (passphrase)

View File

@@ -64,6 +64,12 @@ static int ssh_set_opts(ssh_session ssh, struct mscp_ssh_opts *opts)
return -1;
}
if (is_specified(opts->ccalgo) &&
ssh_options_set(ssh, SSH_OPTIONS_CCALGO, opts->ccalgo) < 0) {
mscp_set_error("failed to set cclago");
return -1;
}
/* if NOT specified to enable Nagle's algorithm, disable it (set TCP_NODELAY) */
if (!opts->enable_nagle) {
int v = 1;

View File

@@ -3,7 +3,9 @@
test_e2e.py: End-to-End test for mscp executable.
"""
import platform
import pytest
import getpass
import os
from subprocess import check_call, CalledProcessError, PIPE
@@ -89,6 +91,47 @@ def test_double_copy(mscp, src_prefix, dst_prefix, s1, s2, d1, d2):
d1.cleanup()
d2.cleanup()
remote_v6_prefix = "[::1]:{}/".format(os.getcwd())
param_remote_v6_prefix = [
("", remote_v6_prefix), (remote_v6_prefix, "")
]
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_v6_prefix)
@pytest.mark.parametrize("s1, s2, d1, d2", param_double_copy)
def test_double_copy_with_ipv6_notation(mscp, src_prefix, dst_prefix, s1, s2, d1, d2):
s1.make()
s2.make()
run2ok([mscp, "-H", "-vvv",
src_prefix + s1.path, src_prefix + s2.path, dst_prefix + "dst"])
assert check_same_md5sum(s1, d1)
assert check_same_md5sum(s2, d2)
s1.cleanup()
s2.cleanup()
d1.cleanup()
d2.cleanup()
remote_user_v6_prefix = "{}@[::1]:{}/".format(getpass.getuser(), os.getcwd())
param_remote_user_v6_prefix = [
("", remote_user_v6_prefix), (remote_user_v6_prefix, "")
]
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_user_v6_prefix)
@pytest.mark.parametrize("s1, s2, d1, d2", param_double_copy)
def test_double_copy_with_user_and_ipv6_notation(mscp, src_prefix, dst_prefix,
s1, s2, d1, d2):
s1.make()
s2.make()
run2ok([mscp, "-H", "-vvv",
src_prefix + s1.path, src_prefix + s2.path, dst_prefix + "dst"])
assert check_same_md5sum(s1, d1)
assert check_same_md5sum(s2, d2)
s1.cleanup()
s2.cleanup()
d1.cleanup()
d2.cleanup()
param_dir_copy = [
( "src_dir", "dst_dir",
[ File("src_dir/t1", size = 64),
@@ -143,7 +186,7 @@ param_dir_copy_single = [
def test_dir_copy_single(mscp, src_prefix, dst_prefix, src_dir, dst_dir, src, dst):
src.make()
os.mkdir(dst_dir)
run2ok(["mscp", "-H", "-vvv", src_prefix + src_dir, dst_prefix + dst_dir])
run2ok([mscp, "-H", "-vvv", src_prefix + src_dir, dst_prefix + dst_dir])
assert check_same_md5sum(src, dst)
src.cleanup()
dst.cleanup()
@@ -268,6 +311,21 @@ def test_compression(mscp, src_prefix, dst_prefix, compress):
src.cleanup()
dst.cleanup()
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
def test_ccalgo(mscp, src_prefix, dst_prefix):
src = File("src", size = 1024 * 1024).make()
dst = File("dst").make()
if platform.system() == "Darwin":
# Darwin does not support TCP_CONGESTION
algo = "cubic"
run = run2ng
elif platform.system() == "Linux":
# Linux supports TCP_CONGESTION
with open("/proc/sys/net/ipv4/tcp_allowed_congestion_control", "r") as f:
algo = f.read().strip().split().pop()
run = run2ok
run([mscp, "-H", "-vvv", "-g", algo, src_prefix + src.path, dst_prefix + "dst"])
testhost = "mscptestlocalhost"
testhost_prefix = "{}:{}/".format(testhost, os.getcwd()) # use current dir

View File

@@ -121,6 +121,7 @@ param_invalid_kwargs = [
{ "cipher": "invalid" },
{ "hmac": "invalid"},
{ "compress": "invalid"},
{ "ccalgo": "invalid"},
]
@pytest.mark.parametrize("kw", param_invalid_kwargs)