10 Commits

Author SHA1 Message Date
Ryo Nakamura
8ab06c9531 bump version to 0.0.9 2023-07-20 22:11:48 +09:00
Ryo Nakamura
a847ef1ea8 drop centos8, add almalinux 8.8, update rocky to 8.8
And cleanup Docker files
2023-07-20 21:54:43 +09:00
Ryo Nakamura
24e86f58d8 mscp: maintain mscp_thread structs in list
Instead of m->threads array, struct mscp_thread instanes are
maintained in m->thread_list. This enables stable counter access
via mscp_get_stats().
2023-05-07 21:05:05 +09:00
Ryo Nakamura
1d3b3a2261 main: add a white space to the elapsed time output
It adjusts the position of XX:XX in elapsed timeou output.
2023-04-05 19:07:10 +09:00
Ryo Nakamura
575c920b6e main: print elapsed time instead ETA at the end 2023-04-05 19:00:29 +09:00
Ryo Nakamura
1bd832a135 Merge branch 'main' of github.com:upa/mscp 2023-03-26 01:50:00 +09:00
Ryo Nakamura
834407379d fix error handling when scan thread failed.
set chunk pool to fill to invoke copy threads when scan failed.
2023-03-25 22:29:09 +09:00
Ryo Nakamura
6be61e8adf test: add sleep -1 before ssh-keyscan 2023-03-22 19:24:14 +09:00
Ryo Nakamura
8192151154 fix invalid return sem 2023-03-22 18:06:19 +09:00
Ryo Nakamura
3f00bd2c7b test: set min_chunk_sz to 32768 on test_min_chunk
Page size of arm mac is 16384.
2023-03-22 18:00:52 +09:00
22 changed files with 252 additions and 154 deletions

View File

@@ -45,8 +45,8 @@ jobs:
files: |
${{github.workspace}}/build/mscp_ubuntu-20.04-x86_64.deb
${{github.workspace}}/build/mscp_ubuntu-22.04-x86_64.deb
${{github.workspace}}/build/mscp_centos-8-x86_64.rpm
${{github.workspace}}/build/mscp_rocky-8.6-x86_64.rpm
${{github.workspace}}/build/mscp_rocky-8.8-x86_64.rpm
${{github.workspace}}/build/mscp_almalinux-8.8-x86_64.rpm
${{github.workspace}}/build/mscp_alpine-3.17-x86_64.static
${{github.workspace}}/build/mscp.linux.x86.static

View File

@@ -121,9 +121,8 @@ enable_testing()
# CPACK Rules
set(CPACK_SET_DESTDIR true)
set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
#set(CPACK_SET_DESTDIR true)
set(CPACK_PACKAGE_NAME ${PROJECT_NAME})
set(CPACK_PACKAGE_CONTACT "Ryo Nakamura <upa@haeena.net>")
set(CPACK_PACKAGE_DESCRIPTION
"mscp, copy files over multiple ssh connections")
@@ -153,6 +152,7 @@ if(UNIX AND NOT APPLE) # on linux
set(CPACK_RPM_PACKAGE_REQUIRES ${DIST_DEP})
set(CPACK_RPM_PACKAGE_HOMEPAGE "https://github.com/upa/mscp")
set(CPACK_RPM_PACKAGE_DESCRIPTION ${CPACK_PACKAGE_DESCRIPTION})
set(CPACK_RPM_PACKAGE_LICENSE "GPLv3")
endif() # on linux
include(CPack)
@@ -162,9 +162,9 @@ include(CPack)
# Custom targets to build and test mscp in docker containers.
# foreach(IN ZIP_LISTS) (cmake >= 3.17) can shorten the following lists.
# However, ubuntu 20.04 has cmake 3.16.3. So this is a roundabout trick.
list(APPEND DIST_NAMES ubuntu ubuntu centos rocky alpine)
list(APPEND DIST_VERS 20.04 22.04 8 8.6 3.17)
list(APPEND DIST_PKGS deb deb rpm rpm static)
list(APPEND DIST_NAMES ubuntu ubuntu rocky almalinux alpine)
list(APPEND DIST_VERS 20.04 22.04 8.8 8.8 3.17)
list(APPEND DIST_PKGS deb deb rpm rpm static)
list(LENGTH DIST_NAMES _DIST_LISTLEN)
math(EXPR DIST_LISTLEN "${_DIST_LISTLEN} - 1")

View File

@@ -48,9 +48,14 @@ wget https://github.com/upa/mscp/releases/latest/download/mscp_ubuntu-20.04-x86_
apt-get install -f ./mscp_ubuntu-20.04-x86_64.deb
```
- Rocky 8.6
- Rocky 8.8
```console
yum install https://github.com/upa/mscp/releases/latest/download/mscp_rocky-8.6-x86_64.rpm
yum install https://github.com/upa/mscp/releases/latest/download/mscp_rocky-8.8-x86_64.rpm
```
- Alma 8.8
```console
yum install https://github.com/upa/mscp/releases/latest/download/mscp_almalinux-8.8-x86_64.rpm
```
- Linux with single binary `mscp` (x86_64 only)
@@ -67,7 +72,7 @@ patch introduces asynchronous SFTP Write, which is derived from
https://github.com/limes-datentechnik-gmbh/libssh (see [Re: SFTP Write
async](https://archive.libssh.org/libssh/2020-06/0000004.html)).
Currently macOS and Linux (Ubuntu, CentOS, Rocky) are supported.
Currently macOS and Linux (Ubuntu, Rocky and Alma) are supported.
```console
# clone this repository

View File

@@ -1 +1 @@
0.0.8
0.0.9

View File

@@ -8,9 +8,7 @@ docker build -t mscp-ubuntu:20.04 -f docker/ubuntu-20.04.Dockerfile .
docker build -t mscp-ubuntu:22.04 -f docker/ubuntu-22.04.Dockerfile .
docker build -t mscp-centos:8 -f docker/centos-8.Dockerfile .
docker build -t mscp-rocky:8.6 -f docker/rocky-8.6.Dockerfile .
docker build -t mscp-rocky:8.8 -f docker/rocky-8.Dockerfile .
```
Test `mscp` in the containers.
@@ -20,25 +18,20 @@ docker run --init --rm mscp-ubuntu:20.04 /mscp/scripts/test-in-container.sh
docker run --init --rm mscp-ubuntu:22.04 /mscp/scripts/test-in-container.sh
docker run --init --rm mscp-centos:8 /mscp/scripts/test-in-container.sh
docker run --init --rm mscp-rocky:8.6 /mscp/scripts/test-in-container.sh
docker run --init --rm mscp-rocky:8.9 /mscp/scripts/test-in-container.sh
```
Retrieve deb/rpm packages.
```console
docker run --rm -v (pwd):/out mscp-ubuntu:20.04 \
cp /mscp/build/mscp_0.0.0-ubuntu-20.04-x86_64.deb /out/
cp /mscp/build/mscp_ubuntu-20.04-x86_64.deb /out/
docker run --rm -v (pwd):/out mscp-ubuntu:22.04 \
cp /mscp/build/mscp_0.0.0-ubuntu-22.04-x86_64.deb /out/
cp /mscp/build/mscp_ubuntu-22.04-x86_64.deb /out/
docker run --rm -v (pwd):/out mscp-centos:8 \
cp /mscp/build/mscp_0.0.0-centos-8-x86_64.rpm /out/
docker run --rm -v (pwd):/out mscp-rocky:8.6 \
cp /mscp/build/mscp_0.0.0-rocky-8.6-x86_64.rpm /out/
docker run --rm -v (pwd):/out mscp-rocky:8.8 \
cp /mscp/build/mscp_rocky-8.8-x86_64.rpm /out/
```
I don't know whether these are good way.

View File

@@ -1,8 +1,4 @@
FROM rockylinux:8.6
ARG mscpdir="/mscp"
COPY . ${mscpdir}
FROM almalinux:8.8
# install pytest, sshd for test, and rpm-build
RUN set -ex && yum -y install \
@@ -17,6 +13,11 @@ RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# install build dependency
RUN ${mscpdir}/scripts/install-build-deps.sh

View File

@@ -2,18 +2,26 @@ FROM alpine:3.17
# Build mscp with conan to create single binary mscp
ARG mscpdir="/mscp"
COPY . ${mscpdir}
RUN apk add --no-cache \
gcc make cmake python3 py3-pip perl linux-headers libc-dev \
openssh bash python3-dev g++
RUN pip3 install conan pytest
# preparation for sshd
RUN ssh-keygen -A
RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
# Build mscp as a single binary
RUN conan profile detect --force
ARG mscpdir="/mscp"
COPY . ${mscpdir}
RUN cd ${mscpdir} \
&& rm -rf build \
&& conan install . --output-folder=build --build=missing \
@@ -25,15 +33,8 @@ RUN cd ${mscpdir} \
&& make \
&& cp mscp /usr/bin/ \
&& cp mscp /mscp/build/mscp_alpine-3.17-x86_64.static
# copy mscp to PKG FILE NAME because this build doesn't use CPACK
# preparation for sshd
RUN ssh-keygen -A
RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
# install mscp python module
RUN cd ${mscpdir} \
&& python3 setup.py install --user

View File

@@ -1,16 +1,7 @@
FROM centos:8
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# from https://stackoverflow.com/questions/70963985/error-failed-to-download-metadata-for-repo-appstream-cannot-prepare-internal
RUN cd /etc/yum.repos.d/
RUN sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
RUN sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
FROM rockylinux:8.8
# install pytest, sshd for test, and rpm-build
RUN set -ex && yum -y update && yum -y install \
RUN set -ex && yum -y install \
python3 python3-pip python3-devel openssh openssh-server openssh-clients rpm-build
RUN python3 -m pip install pytest
@@ -22,13 +13,17 @@ RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# install build dependency
RUN ${mscpdir}/scripts/install-build-deps.sh
# build
RUN cd ${mscpdir} \
&& rm -rf build \
&& cmake -B build \
&& cmake -B build \
&& cd ${mscpdir}/build \
&& make \
&& cpack -G RPM CPackConfig.cmake \
@@ -37,3 +32,4 @@ RUN cd ${mscpdir} \
# install mscp python module
RUN cd ${mscpdir} \
&& python3 setup.py install --user

View File

@@ -1,10 +1,6 @@
FROM ubuntu:20.04
ARG DEBIAN_FRONTEND=noninteractive
ARG mscpdir="/mscp"
COPY . ${mscpdir}
RUN set -ex && apt-get update && apt-get install -y --no-install-recommends \
ca-certificates
@@ -21,6 +17,10 @@ RUN mkdir /var/run/sshd \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# install build dependency
RUN ${mscpdir}/scripts/install-build-deps.sh

View File

@@ -1,10 +1,6 @@
FROM ubuntu:22.04
ARG DEBIAN_FRONTEND=noninteractive
ARG mscpdir="/mscp"
COPY . ${mscpdir}
RUN set -ex && apt-get update && apt-get install -y --no-install-recommends \
ca-certificates
@@ -20,6 +16,9 @@ RUN mkdir /var/run/sshd \
&& ssh-keygen -f /root/.ssh/id_rsa -N "" \
&& mv /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
ARG mscpdir="/mscp"
COPY . ${mscpdir}
# install build dependency
RUN ${mscpdir}/scripts/install-build-deps.sh

View File

@@ -184,7 +184,7 @@ int mscp_scan_join(struct mscp *m);
*
* @param m mscp instance.
*
* @return 0 on success, < 0 if an error occured.
* @return number of threads on success, < 0 if an error occured.
* mscp_get_error() can be used to retrieve error message.
*
* @see mscp_join()

View File

@@ -19,7 +19,7 @@ case $platform in
apt-get install -y \
gcc make cmake zlib1g-dev libssl-dev libkrb5-dev
;;
Linux-centos* | Linux-rhel* | Linux-rocky*)
Linux-centos* | Linux-rhel* | Linux-rocky* | Linux-almalinux)
yum install -y \
gcc make cmake zlib-devel openssl-devel rpm-build
;;

View File

@@ -14,10 +14,10 @@ case $release in
ubuntu-22.04*)
echo "libc6 (>= 2.33), libgssapi-krb5-2 (>= 1.17), libssl3 (>= 3.0.0~~alpha1), zlib1g (>= 1:1.1.4)"
;;
centos* | rhel* | rocky*)
centos* | rhel* | rocky* | almalinux*)
echo "glibc crypto-policies krb5-libs openssl-libs libcom_err"
;;
*)
echo "unsupported install dependency: $release"
echo "$(basename $0): unsupported install dependency: $release"
exit 1
esac

View File

@@ -11,6 +11,7 @@ set -x
# Run sshd
if [ ! -e /var/run/sshd.pid ]; then
/usr/sbin/sshd
sleep 1
fi
ssh-keyscan localhost >> ${HOME}/.ssh/known_hosts

View File

@@ -20,6 +20,8 @@ static inline refcnt refcnt_dec(refcnt *cnt)
}
/* mutex */
typedef pthread_mutex_t lock;
static inline void lock_init(lock *l)
@@ -44,12 +46,58 @@ static inline void lock_release_via_cleanup(void *l)
lock_release(l);
}
#define LOCK_ACQUIRE_THREAD(l) \
lock_acquire(l); \
pthread_cleanup_push(lock_release_via_cleanup, l)
#define LOCK_ACQUIRE(l) \
lock_acquire(l); \
pthread_cleanup_push(lock_release_via_cleanup, l)
#define LOCK_RELEASE_THREAD() \
#define LOCK_RELEASE() \
pthread_cleanup_pop(1)
/* read/write lock */
typedef pthread_rwlock_t rwlock;
static inline void rwlock_init(rwlock *rw)
{
pthread_rwlock_init(rw, NULL);
}
static inline void rwlock_read_acquire(rwlock *rw)
{
int ret = pthread_rwlock_rdlock(rw);
assert(ret == 0);
}
static inline void rwlock_write_acquire(rwlock *rw)
{
int ret = pthread_rwlock_wrlock(rw);
assert(ret == 0);
}
static inline void rwlock_release(rwlock *rw)
{
int ret = pthread_rwlock_unlock(rw);
assert(ret == 0);
}
static inline void rwlock_release_via_cleanup(void *rw)
{
rwlock_release(rw);
}
#define RWLOCK_READ_ACQUIRE(rw) \
rwlock_read_acquire(rw); \
pthread_cleanup_push(rwlock_release_via_cleanup, rw)
#define RWLOCK_WRITE_ACQUIRE(rw) \
rwlock_write_acquire(rw); \
pthread_cleanup_push(rwlock_release_via_cleanup, rw)
#define RWLOCK_RELEASE() \
pthread_cleanup_pop(1)
#endif /* _ATOMIC_H_ */

View File

@@ -380,11 +380,11 @@ int main(int argc, char **argv)
ret = mscp_start(m);
if (ret < 0)
fprintf(stderr, "%s\n", mscp_get_error());
fprintf(stderr, "mscp_start: %s\n", mscp_get_error());
ret = mscp_join(m);
if (ret != 0)
fprintf(stderr, "%s\n", mscp_get_error());
fprintf(stderr, "mscp_join: %s\n", mscp_get_error());
pthread_cancel(tid_stat);
pthread_join(tid_stat, NULL);
@@ -421,7 +421,8 @@ double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
return (double)diff / calculate_timedelta(b, a);
}
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a)
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a,
bool final)
{
static char buf[16];
double elapsed = calculate_timedelta(b, a);
@@ -429,7 +430,10 @@ char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeva
if (diff == 0)
snprintf(buf, sizeof(buf), "--:-- ETA");
else {
else if (final) {
snprintf(buf, sizeof(buf), "%02d:%02d ",
(int)(floor(elapsed / 60)), (int)round(elapsed) % 60);
} else {
eta = remain / (diff / elapsed);
snprintf(buf, sizeof(buf), "%02d:%02d ETA",
(int)floor(eta / 60), (int)round(eta) % 60);
@@ -474,7 +478,7 @@ void print_progress_bar(double percent, char *suffix)
}
void print_progress(struct timeval *b, struct timeval *a,
size_t total, size_t last, size_t done)
size_t total, size_t last, size_t done, bool final)
{
char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" };
char *byte_units[] = { "B ", "KB", "MB", "GB", "TB", "PB" };
@@ -509,7 +513,8 @@ void print_progress(struct timeval *b, struct timeval *a,
snprintf(suffix, sizeof(suffix), "%4lu%s/%lu%s %6.1f%s %s",
done_round, byte_units[byte_du], total_round, byte_units[byte_tu],
bps, bps_units[bps_u], calculate_eta(total - done, done - last, b, a));
bps, bps_units[bps_u],
calculate_eta(total - done, done - last, b, a, final));
print_progress_bar(percent, suffix);
}
@@ -533,7 +538,7 @@ void print_stat_thread_cleanup(void *arg)
x.done = s.done;
/* print progress from the beginning */
print_progress(&x.start, &x.after, x.total, 0, x.done);
print_progress(&x.start, &x.after, x.total, 0, x.done, true);
print_cli("\n"); /* final output */
}
@@ -572,7 +577,8 @@ void *print_stat_thread(void *arg)
x.total = s.total;
x.done = s.done;
print_progress(&x.before, &x.after, x.total, x.last, x.done);
print_progress(&x.before, &x.after, x.total, x.last, x.done,
false);
x.before = x.after;
x.last = x.done;
}

View File

@@ -40,11 +40,15 @@ struct mscp {
int ret_scan; /* return code from scan thread */
size_t total_bytes; /* total bytes to be transferred */
struct mscp_thread *threads;
struct list_head thread_list;
rwlock thread_rwlock;
};
struct mscp_thread {
struct list_head list; /* mscp->thread_list */
struct mscp *m;
int id;
sftp_session sftp;
@@ -56,7 +60,7 @@ struct mscp_thread {
};
struct src {
struct list_head list;
struct list_head list; /* mscp->src_list */
char *path;
};
@@ -211,7 +215,7 @@ struct mscp *mscp_init(const char *remote_host, int direction,
int n;
if (!remote_host) {
mscp_set_error("empty remote host\n");
mscp_set_error("empty remote host");
return NULL;
}
@@ -238,6 +242,9 @@ struct mscp *mscp_init(const char *remote_host, int direction,
INIT_LIST_HEAD(&m->path_list);
chunk_pool_init(&m->cp);
INIT_LIST_HEAD(&m->thread_list);
rwlock_init(&m->thread_rwlock);
if ((m->sem = sem_create(o->max_startups)) == NULL) {
mscp_set_error("sem_create: %s", strerrno());
goto free_out;
@@ -339,11 +346,14 @@ static int get_page_mask(void)
static void mscp_stop_copy_thread(struct mscp *m)
{
int n;
for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid && !m->threads[n].finished)
pthread_cancel(m->threads[n].tid);
}
struct mscp_thread *t;
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
list_for_each_entry(t, &m->thread_list, list) {
if (!t->finished)
pthread_cancel(t->tid);
}
RWLOCK_RELEASE();
}
static void mscp_stop_scan_thread(struct mscp *m)
@@ -427,16 +437,14 @@ void *mscp_scan_thread(void *arg)
list_splice_tail(&tmp, m->path_list.prev);
}
chunk_pool_set_filled(&m->cp);
mpr_info(m->msg_fp, "walk source path(s) done\n");
chunk_pool_set_filled(&m->cp);
m->ret_scan = 0;
return NULL;
err_out:
chunk_pool_set_filled(&m->cp);
m->ret_scan = -1;
mscp_stop_copy_thread(m);
return NULL;
}
@@ -450,10 +458,10 @@ int mscp_scan(struct mscp *m)
return -1;
}
/* need scan finished or over nr_threads chunks to determine
* actual number of threads (and connections). If the number
* of chunks are smaller than nr_threads, we adjust nr_threads
* to the number of chunks.
/* We wait for there are over nr_threads chunks to determine
* actual number of threads (and connections), or scan
* finished. If the number of chunks are smaller than
* nr_threads, we adjust nr_threads to the number of chunks.
*/
while (!chunk_pool_is_filled(&m->cp) &&
chunk_pool_size(&m->cp) < m->opts->nr_threads)
@@ -476,9 +484,40 @@ int mscp_scan_join(struct mscp *m)
static void *mscp_copy_thread(void *arg);
static struct mscp_thread *mscp_copy_thread_spawn(struct mscp *m, int id)
{
struct mscp_thread *t;
int ret;
t = malloc(sizeof(*t));
if (!t){
mscp_set_error("malloc: %s,", strerrno());
return NULL;
}
memset(t, 0, sizeof(*t));
t->m = m;
t->id = id;
if (m->cores == NULL)
t->cpu = -1; /* not pinned to cpu */
else
t->cpu = m->cores[id % m->nr_cores];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
if (ret < 0) {
mscp_set_error("pthread_create error: %d", ret);
free(t);
return NULL;
}
return t;
}
int mscp_start(struct mscp *m)
{
int n, ret;
struct mscp_thread *t;
int n, ret = 0;
if ((n = chunk_pool_size(&m->cp)) < m->opts->nr_threads) {
mpr_notice(m->msg_fp, "we have only %d chunk(s). "
@@ -486,63 +525,46 @@ int mscp_start(struct mscp *m)
m->opts->nr_threads = n;
}
/* scan thread instances */
m->threads = calloc(m->opts->nr_threads, sizeof(struct mscp_thread));
memset(m->threads, 0, m->opts->nr_threads * sizeof(struct mscp_thread));
for (n = 0; n < m->opts->nr_threads; n++) {
struct mscp_thread *t = &m->threads[n];
t->m = m;
t->id = n;
if (!m->cores)
t->cpu = -1;
else
t->cpu = m->cores[n % m->nr_cores];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
if (ret < 0) {
mscp_set_error("pthread_create error: %d", ret);
mscp_stop(m);
return -1;
}
t = mscp_copy_thread_spawn(m, n);
if (!t) {
mpr_err(m->msg_fp, "failed to spawn copy thread\n");
break;
}
RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
list_add_tail(&t->list, &m->thread_list);
RWLOCK_RELEASE();
}
return 0;
return n;
}
int mscp_join(struct mscp *m)
{
struct mscp_thread *t;
int n, ret = 0;
/* waiting for scan thread joins... */
ret = mscp_scan_join(m);
/* waiting for copy threads join... */
for (n = 0; n < m->opts->nr_threads; n++) {
if (m->threads[n].tid) {
pthread_join(m->threads[n].tid, NULL);
if (m->threads[n].ret < 0)
ret = m->threads[n].ret;
}
}
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
list_for_each_entry(t, &m->thread_list, list) {
pthread_join(t->tid, NULL);
if (t->ret < 0)
ret = t->ret;
if (t->sftp) {
ssh_sftp_close(t->sftp);
t->sftp = NULL;
}
}
RWLOCK_RELEASE();
if (m->first) {
ssh_sftp_close(m->first);
m->first = NULL;
}
if (m->threads) {
for (n = 0; n < m->opts->nr_threads; n++) {
struct mscp_thread *t = &m->threads[n];
if (t->ret != 0)
ret = ret;
if (t->sftp) {
ssh_sftp_close(t->sftp);
t->sftp = NULL;
}
}
}
return ret;
}
@@ -569,7 +591,7 @@ void *mscp_copy_thread(void *arg)
}
if (sem_wait(m->sem) < 0) {
mscp_set_error("sem_wait: %s\n", strerrno());
mscp_set_error("sem_wait: %s", strerrno());
mpr_err(m->msg_fp, "%s", mscp_get_error());
goto err_out;
}
@@ -579,7 +601,7 @@ void *mscp_copy_thread(void *arg)
t->sftp = ssh_init_sftp_session(m->remote, m->ssh_opts);
if (sem_post(m->sem) < 0) {
mscp_set_error("sem_post: %s\n", strerrno());
mscp_set_error("sem_post: %s", strerrno());
mpr_err(m->msg_fp, "%s", mscp_get_error());
goto err_out;
}
@@ -631,6 +653,7 @@ void *mscp_copy_thread(void *arg)
return NULL;
err_out:
t->finished = true;
t->ret = -1;
return NULL;
}
@@ -660,6 +683,13 @@ static void free_chunk(struct list_head *list)
free(c);
}
static void free_thread(struct list_head *list)
{
struct mscp_thread *t;
t = list_entry(list, typeof(*t), list);
free(t);
}
void mscp_cleanup(struct mscp *m)
{
if (m->first) {
@@ -676,10 +706,9 @@ void mscp_cleanup(struct mscp *m)
chunk_pool_release(&m->cp);
chunk_pool_init(&m->cp);
if (m->threads) {
free(m->threads);
m->threads = NULL;
}
RWLOCK_WRITE_ACQUIRE(&m->thread_rwlock);
list_free_f(&m->thread_list, free_thread);
RWLOCK_RELEASE();
}
void mscp_free(struct mscp *m)
@@ -696,16 +725,19 @@ void mscp_free(struct mscp *m)
void mscp_get_stats(struct mscp *m, struct mscp_stats *s)
{
struct mscp_thread *t;
bool finished = true;
int n;
s->total = m->total_bytes;
for (s->done = 0, n = 0; n < m->opts->nr_threads; n++) {
s->done += m->threads[n].done;
s->done = 0;
if (!m->threads[n].done)
RWLOCK_READ_ACQUIRE(&m->thread_rwlock);
list_for_each_entry(t, &m->thread_list, list) {
s->done += t->done;
if (!t->finished)
finished = false;
}
RWLOCK_RELEASE();
s->finished = finished;
}

View File

@@ -27,10 +27,10 @@ void chunk_pool_init(struct chunk_pool *cp)
static void chunk_pool_add(struct chunk_pool *cp, struct chunk *c)
{
LOCK_ACQUIRE_THREAD(&cp->lock);
LOCK_ACQUIRE(&cp->lock);
list_add_tail(&c->list, &cp->list);
cp->count += 1;
LOCK_RELEASE_THREAD();
LOCK_RELEASE();
}
void chunk_pool_set_filled(struct chunk_pool *cp)
@@ -54,7 +54,7 @@ struct chunk *chunk_pool_pop(struct chunk_pool *cp)
struct list_head *first;
struct chunk *c = NULL;
LOCK_ACQUIRE_THREAD(&cp->lock);
LOCK_ACQUIRE(&cp->lock);
first = cp->list.next;
if (list_empty(&cp->list)) {
if (!chunk_pool_is_filled(cp))
@@ -65,7 +65,7 @@ struct chunk *chunk_pool_pop(struct chunk_pool *cp)
c = list_entry(first, struct chunk, list);
list_del(first);
}
LOCK_RELEASE_THREAD();
LOCK_RELEASE();
/* return CHUNK_POP_WAIT would be very rare case, because it
* means copying over SSH is faster than traversing
@@ -363,7 +363,7 @@ static int prepare_dst_path(FILE *msg_fp, struct path *p, sftp_session dst_sftp)
{
int ret = 0;
LOCK_ACQUIRE_THREAD(&p->lock);
LOCK_ACQUIRE(&p->lock);
if (p->state == FILE_STATE_INIT) {
if (touch_dst_path(p, dst_sftp) < 0) {
ret = -1;
@@ -374,7 +374,7 @@ static int prepare_dst_path(FILE *msg_fp, struct path *p, sftp_session dst_sftp)
}
out:
LOCK_RELEASE_THREAD();
LOCK_RELEASE();
return ret;
}

View File

@@ -201,11 +201,16 @@ static int mscp_stat(const char *path, mstat *s, sftp_session sftp)
if (sftp) {
s->r = sftp_stat(sftp, path);
if (!s->r)
if (!s->r) {
mscp_set_error("sftp_stat: %s %s",
sftp_get_ssh_error(sftp), path);
return -1;
}
} else {
if (stat(path, &s->l) < 0)
if (stat(path, &s->l) < 0) {
mscp_set_error("stat: %s %s", strerrno(), path);
return -1;
}
}
return 0;

View File

@@ -101,7 +101,7 @@ sem_t *sem_create(int value)
if (sem_init(sem, 0, value) < 0) {
free(sem);
return sem;
return NULL;
}
return sem;

View File

@@ -61,6 +61,11 @@ def test_single_copy(mscp, src_prefix, dst_prefix, src, dst):
src.cleanup()
dst.cleanup()
@pytest.mark.parametrize("src_prefix, dst_prefix", param_remote_prefix)
def test_failed_to_copy_nonexistent_file(mscp, src_prefix, dst_prefix):
src = "nonexistent_src"
dst = "nonexistent_dst"
run2ng([mscp, "-H", src_prefix + src, dst_prefix + dst])
param_double_copy = [
(File("src1", size = 1024 * 1024), File("src2", size = 1024 * 1024),
@@ -139,7 +144,7 @@ def test_min_chunk(mscp, src_prefix, dst_prefix):
src = File("src", size = 16 * 1024).make()
dst = File("dst")
run2ok([mscp, "-H", "-s", 8192, src_prefix + src.path, dst_prefix + dst.path])
run2ok([mscp, "-H", "-s", 32768, src_prefix + src.path, dst_prefix + dst.path])
assert check_same_md5sum(src, dst)
src.cleanup()
@@ -150,7 +155,7 @@ def test_thread_affinity(mscp, src_prefix, dst_prefix):
src = File("src", size = 64 * 1024).make()
dst = File("dst")
run2ok([mscp, "-H", "-n", 4, "-m", "0x01", "-s", 8192, "-S", 65536,
run2ok([mscp, "-H", "-n", 4, "-m", "0x01",
src_prefix + src.path, dst_prefix + dst.path])
assert check_same_md5sum(src, dst)

View File

@@ -104,6 +104,12 @@ def test_login_failed():
with pytest.raises(RuntimeError) as e:
m.connect()
def test_get_stat_before_copy_start():
m = mscp.mscp("localhost", mscp.LOCAL2REMOTE)
m.connect()
(total, done, finished) = m.stats()
assert total == 0 and done == 0
param_invalid_kwargs = [
{ "nr_threads": -1 },