mirror of
https://github.com/upa/mscp.git
synced 2026-02-04 03:24:58 +08:00
add -m coremask option
This commit is contained in:
95
src/main.c
95
src/main.c
@@ -50,6 +50,7 @@ struct mscp_thread {
|
|||||||
sftp_session sftp;
|
sftp_session sftp;
|
||||||
|
|
||||||
pthread_t tid;
|
pthread_t tid;
|
||||||
|
int cpu;
|
||||||
size_t done; /* copied bytes */
|
size_t done; /* copied bytes */
|
||||||
bool finished;
|
bool finished;
|
||||||
int ret;
|
int ret;
|
||||||
@@ -85,7 +86,7 @@ int list_count(struct list_head *head)
|
|||||||
void usage(bool print_help) {
|
void usage(bool print_help) {
|
||||||
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
|
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
|
||||||
"\n"
|
"\n"
|
||||||
"Usage: mscp [vqDCHdh] [-n nr_conns]\n"
|
"Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]\n"
|
||||||
" [-s min_chunk_sz] [-S max_chunk_sz]\n"
|
" [-s min_chunk_sz] [-S max_chunk_sz]\n"
|
||||||
" [-b sftp_buf_sz] [-B io_buf_sz] [-a nr_ahead]\n"
|
" [-b sftp_buf_sz] [-B io_buf_sz] [-a nr_ahead]\n"
|
||||||
" [-l login_name] [-p port] [-i identity_file]\n"
|
" [-l login_name] [-p port] [-i identity_file]\n"
|
||||||
@@ -96,6 +97,7 @@ void usage(bool print_help) {
|
|||||||
return;
|
return;
|
||||||
|
|
||||||
printf(" -n NR_CONNECTIONS number of connections (default: half of # of cpu cores)\n"
|
printf(" -n NR_CONNECTIONS number of connections (default: half of # of cpu cores)\n"
|
||||||
|
" -m COREMASK hex value to specify cores where threads pinned\n"
|
||||||
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
|
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
|
||||||
" -S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)\n"
|
" -S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)\n"
|
||||||
"\n"
|
"\n"
|
||||||
@@ -164,15 +166,70 @@ err_out:
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int expand_coremask(const char *coremask, int **cores, int *nr_cores)
|
||||||
|
{
|
||||||
|
int n, *core_list, core_list_len = 0, nr_usable, nr_all;
|
||||||
|
char c[2] = { 'x', '\0' };
|
||||||
|
const char *_coremask;
|
||||||
|
long v, needle;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This function returns array of usabe cores in `cores` and
|
||||||
|
* returns the number of usabel cores (array length) through
|
||||||
|
* nr_cores.
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (strncmp(coremask, "0x", 2) == 0)
|
||||||
|
_coremask = coremask + 2;
|
||||||
|
else
|
||||||
|
_coremask = coremask;
|
||||||
|
|
||||||
|
core_list = realloc(NULL, sizeof(int) * 64);
|
||||||
|
if (!core_list) {
|
||||||
|
pr_err("failed to realloc: %s\n", strerrno());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
nr_usable = 0;
|
||||||
|
nr_all = 0;
|
||||||
|
for (n = strlen(_coremask) - 1; n >=0; n--) {
|
||||||
|
c[0] = _coremask[n];
|
||||||
|
v = strtol(c, NULL, 16);
|
||||||
|
if (v == LONG_MIN || v == LONG_MAX) {
|
||||||
|
pr_err("invalid coremask: %s\n", coremask);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (needle = 0x01; needle < 0x10; needle <<= 1) {
|
||||||
|
nr_all++;
|
||||||
|
if (v & needle) {
|
||||||
|
nr_usable++;
|
||||||
|
core_list = realloc(core_list, sizeof(int) * nr_usable);
|
||||||
|
if (!core_list) {
|
||||||
|
pr_err("failed to realloc: %s\n", strerrno());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
core_list[nr_usable - 1] = nr_all - 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*cores = core_list;
|
||||||
|
*nr_cores = nr_usable;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char **argv)
|
int main(int argc, char **argv)
|
||||||
{
|
{
|
||||||
struct mscp m;
|
struct mscp m;
|
||||||
struct ssh_opts opts;
|
struct ssh_opts opts;
|
||||||
int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ;
|
int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ;
|
||||||
int max_chunk_sz = 0;
|
int max_chunk_sz = 0;
|
||||||
|
char *coremask = NULL;;
|
||||||
int verbose = 1;
|
int verbose = 1;
|
||||||
bool dryrun = false;
|
bool dryrun = false;
|
||||||
int ret = 0, n;
|
int ret = 0, n;
|
||||||
|
int *cores, nr_cores;
|
||||||
char ch;
|
char ch;
|
||||||
|
|
||||||
memset(&opts, 0, sizeof(opts));
|
memset(&opts, 0, sizeof(opts));
|
||||||
@@ -187,7 +244,7 @@ int main(int argc, char **argv)
|
|||||||
nr_threads = (int)(nr_cpus() / 2);
|
nr_threads = (int)(nr_cpus() / 2);
|
||||||
nr_threads = nr_threads == 0 ? 1 : nr_threads;
|
nr_threads = nr_threads == 0 ? 1 : nr_threads;
|
||||||
|
|
||||||
while ((ch = getopt(argc, argv, "n:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) {
|
while ((ch = getopt(argc, argv, "n:m:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) {
|
||||||
switch (ch) {
|
switch (ch) {
|
||||||
case 'n':
|
case 'n':
|
||||||
nr_threads = atoi(optarg);
|
nr_threads = atoi(optarg);
|
||||||
@@ -196,6 +253,9 @@ int main(int argc, char **argv)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case 'm':
|
||||||
|
coremask = optarg;
|
||||||
|
break;
|
||||||
case 's':
|
case 's':
|
||||||
min_chunk_sz = atoi(optarg);
|
min_chunk_sz = atoi(optarg);
|
||||||
if (min_chunk_sz < getpagesize()) {
|
if (min_chunk_sz < getpagesize()) {
|
||||||
@@ -288,20 +348,29 @@ int main(int argc, char **argv)
|
|||||||
|
|
||||||
pprint_set_level(verbose);
|
pprint_set_level(verbose);
|
||||||
|
|
||||||
|
if (argc - optind < 2) {
|
||||||
|
/* mscp needs at lease 2 (src and target) argument */
|
||||||
|
usage(false);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
m.target = argv[argc - 1];
|
||||||
|
|
||||||
if (max_chunk_sz > 0 && min_chunk_sz > max_chunk_sz) {
|
if (max_chunk_sz > 0 && min_chunk_sz > max_chunk_sz) {
|
||||||
pr_err("smaller max chunk size than min chunk size: %d < %d\n",
|
pr_err("smaller max chunk size than min chunk size: %d < %d\n",
|
||||||
max_chunk_sz, min_chunk_sz);
|
max_chunk_sz, min_chunk_sz);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (argc - optind < 2) {
|
/* expand usable cores from coremask */
|
||||||
/* mscp needs at lease 2 (src and target) argument */
|
if (coremask) {
|
||||||
usage(false);
|
if (expand_coremask(coremask, &cores, &nr_cores) < 0)
|
||||||
return 1;
|
return -1;
|
||||||
|
pprint(1, "cpu cores:");
|
||||||
|
for (n = 0; n < nr_cores; n++)
|
||||||
|
pprint(1, " %d", cores[n]);
|
||||||
|
pprint(1, "\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
m.target = argv[argc - 1];
|
|
||||||
|
|
||||||
/* create control session */
|
/* create control session */
|
||||||
m.host = find_hostname(optind, argc, argv);
|
m.host = find_hostname(optind, argc, argv);
|
||||||
if (!m.host) {
|
if (!m.host) {
|
||||||
@@ -350,6 +419,11 @@ int main(int argc, char **argv)
|
|||||||
struct mscp_thread *t = &threads[n];
|
struct mscp_thread *t = &threads[n];
|
||||||
t->mscp = &m;
|
t->mscp = &m;
|
||||||
t->finished = false;
|
t->finished = false;
|
||||||
|
if (!coremask)
|
||||||
|
t->cpu = -1;
|
||||||
|
else
|
||||||
|
t->cpu = cores[n % nr_cores];
|
||||||
|
|
||||||
pprint3("connecting to %s for a copy thread...\n", m.host);
|
pprint3("connecting to %s for a copy thread...\n", m.host);
|
||||||
t->sftp = ssh_make_sftp_session(m.host, m.opts);
|
t->sftp = ssh_make_sftp_session(m.host, m.opts);
|
||||||
if (!t->sftp) {
|
if (!t->sftp) {
|
||||||
@@ -426,6 +500,11 @@ void *mscp_copy_thread(void *arg)
|
|||||||
sftp_session sftp = t->sftp;
|
sftp_session sftp = t->sftp;
|
||||||
struct chunk *c;
|
struct chunk *c;
|
||||||
|
|
||||||
|
if (t->cpu > -1) {
|
||||||
|
if (set_thread_affinity(pthread_self(), t->cpu) < 0)
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
||||||
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
|
||||||
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
|
||||||
|
|||||||
@@ -24,6 +24,14 @@ int nr_cpus()
|
|||||||
|
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int set_thread_affinity(pthread_t tid, int core)
|
||||||
|
{
|
||||||
|
errno = ENOTSUP;
|
||||||
|
pr_err("setting thread afinity is not implemented on apple\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef linux
|
#ifdef linux
|
||||||
@@ -34,5 +42,19 @@ int nr_cpus()
|
|||||||
return CPU_COUNT(&cpu_set);
|
return CPU_COUNT(&cpu_set);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int set_thread_affinity(pthread_t tid, int core)
|
||||||
|
{
|
||||||
|
cpu_set_t target_cpu_set;
|
||||||
|
int ret = 0;
|
||||||
|
|
||||||
|
CPU_ZERO(&target_cpu_set);
|
||||||
|
CPU_SET(core, &target_cpu_set);
|
||||||
|
ret = pthread_setaffinity_np(tid, sizeof(target_cpu_set), &target_cpu_set);
|
||||||
|
if (ret < 0)
|
||||||
|
pr_err("failed to set thread/cpu affinity for core %d: %s",
|
||||||
|
core, strerrno());
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
#ifndef _PLATFORM_H_
|
#ifndef _PLATFORM_H_
|
||||||
#define _PLATFORM_H_
|
#define _PLATFORM_H_
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
int nr_cpus();
|
int nr_cpus();
|
||||||
|
int set_thread_affinity(pthread_t tid, int core);
|
||||||
|
|
||||||
#endif /* _PLATFORM_H_ */
|
#endif /* _PLATFORM_H_ */
|
||||||
|
|||||||
@@ -150,6 +150,18 @@ def test_min_chunk(mscp, src_prefix, dst_prefix):
|
|||||||
src.cleanup()
|
src.cleanup()
|
||||||
dst.cleanup()
|
dst.cleanup()
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
|
||||||
|
def test_thread_affinity(mscp, src_prefix, dst_prefix):
|
||||||
|
src = File("src", size = 64 * 1024).make()
|
||||||
|
dst = File("dst")
|
||||||
|
|
||||||
|
run2ok([mscp, "-H", "-n", 4, "-m", "0x01", "-s", 8192, "-S", 65536,
|
||||||
|
src_prefix + src.path, dst_prefix + dst.path])
|
||||||
|
assert check_same_md5sum(src, dst)
|
||||||
|
|
||||||
|
src.cleanup()
|
||||||
|
dst.cleanup()
|
||||||
|
|
||||||
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
|
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
|
||||||
def test_cannot_override_file_with_dir(mscp, src_prefix, dst_prefix):
|
def test_cannot_override_file_with_dir(mscp, src_prefix, dst_prefix):
|
||||||
src = File("src", size = 128).make()
|
src = File("src", size = 128).make()
|
||||||
|
|||||||
Reference in New Issue
Block a user