mirror of
https://github.com/upa/mscp.git
synced 2026-02-27 10:44:41 +08:00
introduce semaphore for concurrent connecting ssh
instead of ssh_estab_queue (delay-based approach). MaxStartups in sshd_config limits number of conccurent incoming ssh connections. mscp_opts->max_startups adjusts this value.
This commit is contained in:
@@ -43,6 +43,7 @@ struct mscp_opts {
|
|||||||
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
|
size_t max_chunk_sz; /** maximum chunk size (default file size/nr_threads) */
|
||||||
size_t buf_sz; /** buffer size, default 16k. */
|
size_t buf_sz; /** buffer size, default 16k. */
|
||||||
char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
|
char coremask[MSCP_MAX_COREMASK_STR]; /** hex to specifiy usable cpu cores */
|
||||||
|
int max_startups; /* sshd MaxStartups conccurent connections */
|
||||||
|
|
||||||
int severity; /** messaging severity. set MSCP_SERVERITY_* */
|
int severity; /** messaging severity. set MSCP_SERVERITY_* */
|
||||||
int msg_fd; /** fd to output message. default STDOUT (0),
|
int msg_fd; /** fd to output message. default STDOUT (0),
|
||||||
|
|||||||
84
src/mscp.c
84
src/mscp.c
@@ -2,6 +2,7 @@
|
|||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <semaphore.h>
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
|
|
||||||
#include <list.h>
|
#include <list.h>
|
||||||
@@ -13,55 +14,6 @@
|
|||||||
#include <message.h>
|
#include <message.h>
|
||||||
#include <mscp.h>
|
#include <mscp.h>
|
||||||
|
|
||||||
struct ssh_estab_queue {
|
|
||||||
lock lock;
|
|
||||||
struct timeval enter;
|
|
||||||
long delay; /* msec */
|
|
||||||
};
|
|
||||||
|
|
||||||
static long timeval_sub(struct timeval a, struct timeval b)
|
|
||||||
{
|
|
||||||
unsigned long sec, usec;
|
|
||||||
if (a.tv_usec < b.tv_usec) {
|
|
||||||
a.tv_usec += 1000000;
|
|
||||||
a.tv_sec--;
|
|
||||||
}
|
|
||||||
|
|
||||||
sec = a.tv_sec - b.tv_sec;
|
|
||||||
usec = a.tv_usec - b.tv_usec;
|
|
||||||
return sec * 1000000 + usec;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void ssh_estab_queue_init(struct ssh_estab_queue *q)
|
|
||||||
{
|
|
||||||
memset(q, 0, sizeof(q));
|
|
||||||
lock_init(&q->lock);
|
|
||||||
q->delay = 100000; /* To be configurable */
|
|
||||||
}
|
|
||||||
|
|
||||||
static void ssh_estab_queue_ready(struct ssh_estab_queue *q)
|
|
||||||
{
|
|
||||||
struct timeval now;
|
|
||||||
long delta;
|
|
||||||
|
|
||||||
LOCK_ACQUIRE_THREAD(&q->lock);
|
|
||||||
if (q->enter.tv_sec == 0 && q->enter.tv_usec == 0) {
|
|
||||||
/* I'm the first one. */
|
|
||||||
goto ready;
|
|
||||||
}
|
|
||||||
|
|
||||||
gettimeofday(&now, NULL);
|
|
||||||
delta = timeval_sub(now, q->enter);
|
|
||||||
if (delta <= q->delay) {
|
|
||||||
/* wait until enter + delay time */
|
|
||||||
usleep(q->delay - delta);
|
|
||||||
}
|
|
||||||
ready:
|
|
||||||
gettimeofday(&q->enter, NULL);
|
|
||||||
LOCK_RELEASE_THREAD();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
struct mscp {
|
struct mscp {
|
||||||
char *remote; /* remote host (and uername) */
|
char *remote; /* remote host (and uername) */
|
||||||
@@ -74,7 +26,8 @@ struct mscp {
|
|||||||
int *cores; /* usable cpu cores by COREMASK */
|
int *cores; /* usable cpu cores by COREMASK */
|
||||||
int nr_cores; /* length of array of cores */
|
int nr_cores; /* length of array of cores */
|
||||||
|
|
||||||
struct ssh_estab_queue ssh_queue;
|
sem_t sem; /* semaphore for conccurent
|
||||||
|
* connecting ssh sessions */
|
||||||
|
|
||||||
sftp_session first; /* first sftp session */
|
sftp_session first; /* first sftp session */
|
||||||
|
|
||||||
@@ -117,6 +70,8 @@ struct src {
|
|||||||
* sftp_async_read returns 0.
|
* sftp_async_read returns 0.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define DEFAULT_MAX_STARTUPS 8
|
||||||
|
|
||||||
#define non_null_string(s) (s[0] != '\0')
|
#define non_null_string(s) (s[0] != '\0')
|
||||||
|
|
||||||
|
|
||||||
@@ -236,6 +191,9 @@ static int validate_and_set_defaut_params(struct mscp_opts *o)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (o->max_startups == 0)
|
||||||
|
o->max_startups = DEFAULT_MAX_STARTUPS;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -271,7 +229,10 @@ struct mscp *mscp_init(const char *remote_host, int direction,
|
|||||||
INIT_LIST_HEAD(&m->src_list);
|
INIT_LIST_HEAD(&m->src_list);
|
||||||
INIT_LIST_HEAD(&m->path_list);
|
INIT_LIST_HEAD(&m->path_list);
|
||||||
chunk_pool_init(&m->cp);
|
chunk_pool_init(&m->cp);
|
||||||
ssh_estab_queue_init(&m->ssh_queue);
|
if (sem_init(&m->sem, 0, o->max_startups) < 0) {
|
||||||
|
mscp_set_error("failed to initialize semaphore: %s", strerrno());
|
||||||
|
goto free_out;
|
||||||
|
}
|
||||||
|
|
||||||
m->remote = strdup(remote_host);
|
m->remote = strdup(remote_host);
|
||||||
if (!m->remote) {
|
if (!m->remote) {
|
||||||
@@ -591,14 +552,25 @@ void *mscp_copy_thread(void *arg)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ssh_estab_queue_ready(&m->ssh_queue);
|
if (sem_wait(&m->sem) < 0) {
|
||||||
|
mscp_set_error("sem_wait: %s\n", strerrno());
|
||||||
|
mpr_err(m->msg_fp, "%s", mscp_get_error());
|
||||||
|
goto err_out;
|
||||||
|
}
|
||||||
|
|
||||||
mpr_notice(m->msg_fp, "connecting to %s for a copy thread[%d]...\n",
|
mpr_notice(m->msg_fp, "connecting to %s for a copy thread[%d]...\n",
|
||||||
m->remote, t->id);
|
m->remote, t->id);
|
||||||
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
|
||||||
|
|
||||||
|
if (sem_post(&m->sem) < 0) {
|
||||||
|
mscp_set_error("sem_post: %s\n", strerrno());
|
||||||
|
mpr_err(m->msg_fp, "%s", mscp_get_error());
|
||||||
|
goto err_out;
|
||||||
|
}
|
||||||
|
|
||||||
if (!t->sftp) {
|
if (!t->sftp) {
|
||||||
mpr_err(m->msg_fp, "copy thread[%d]: %s\n", t->id, mscp_get_error());
|
mpr_err(m->msg_fp, "copy thread[%d]: %s\n", t->id, mscp_get_error());
|
||||||
t->ret = -1;
|
goto err_out;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (m->direction) {
|
switch (m->direction) {
|
||||||
@@ -641,6 +613,10 @@ void *mscp_copy_thread(void *arg)
|
|||||||
c->p->path, c->off, c->off + c->len);
|
c->p->path, c->off, c->off + c->len);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
|
err_out:
|
||||||
|
t->ret = -1;
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user