implement local-to-remote copy with async_write

This commit is contained in:
Ryo Nakamura
2022-11-17 21:46:21 +09:00
parent a2b4a4c7b3
commit d448f9eb8a
5 changed files with 89 additions and 11 deletions

View File

@@ -20,6 +20,7 @@ target_include_directories(mscp PUBLIC ./src)
if (LIBSSH_PATH) if (LIBSSH_PATH)
find_package(GSSAPI) find_package(GSSAPI)
target_include_directories(mscp PUBLIC ${LIBSSH_PATH}/include) target_include_directories(mscp PUBLIC ${LIBSSH_PATH}/include)
target_compile_options(mscp PUBLIC -iquote ${LIBSSH_PATH}/include)
target_link_directories(mscp PRIVATE ${LIBSSH_PATH}/lib) target_link_directories(mscp PRIVATE ${LIBSSH_PATH}/lib)
target_link_libraries(mscp pthread m libssh.a ssl crypto z ${GSSAPI_LIBRARIES}) target_link_libraries(mscp pthread m libssh.a ssl crypto z ${GSSAPI_LIBRARIES})
else() else()
@@ -27,6 +28,10 @@ else()
target_link_libraries(mscp pthread m ssh) target_link_libraries(mscp pthread m ssh)
endif() endif()
if (WITH_ASYNC_WRITE)
target_compile_definitions(mscp PUBLIC ASYNC_WRITE=1)
endif()
target_compile_definitions(mscp PUBLIC _VERSION="${PROJECT_VERSION}") target_compile_definitions(mscp PUBLIC _VERSION="${PROJECT_VERSION}")
install(TARGETS mscp install(TARGETS mscp

View File

@@ -192,6 +192,6 @@ make && make install
# mv to mscp dir # mv to mscp dir
mv ../.. mv ../..
mkdir build && cd build mkdir build && cd build
cmake .. -DLIBSSH_PATH=../libssh-installed cmake .. -DLIBSSH_PATH=$(pwd)/../libssh-installed -DWITH_ASYNC_WRITE=1
make make
``` ```

View File

@@ -653,11 +653,80 @@ static int chunk_copy_internal_local_to_remote(struct chunk *c, int fd, sftp_fil
return 0; return 0;
} }
#define XFER_BUF_SIZE 16384
#ifdef ASYNC_WRITE
static int chunk_copy_internal_local_to_remote_async(struct chunk *c, int fd,
sftp_file sf, int nr_ahead,
size_t *counter)
{
size_t read_bytes, remaind, thrown;
int idx, ret;
struct {
int id;
size_t len;
char buf[XFER_BUF_SIZE];
} reqs[nr_ahead];
if (c->len == 0)
return 0;
remaind = thrown = c->len;
for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
reqs[idx].len = min(thrown, XFER_BUF_SIZE);
/* TODO: should use iovec? */
read_bytes = read(fd, reqs[idx].buf, reqs[idx].len);
if (read_bytes < 0) {
pr_err("read from %s failed: %s\n", c->f->src_path, strerrno());
return -1;
}
ret = sftp_async_write(sf, reqs[idx].buf, reqs[idx].len, &reqs[idx].id);
if (ret < 0) {
pr_err("sftp_async_write for %s failed: %d\n",
c->f->dst_path, sftp_get_error(sf->sftp));
return -1;
}
}
for (idx = 0; remaind > 0;) {
/* TODO: should be non-blocking */
ret = sftp_async_write_end(sf, reqs[idx].id, 1);
if (ret != SSH_OK) {
pr_err("sftp_async_write_end failed for %s: %d\n",
c->f->dst_path, sftp_get_error(sf->sftp));
return -1;
}
remaind -= reqs[idx].len;
*counter += reqs[idx].len;
if (remaind == 0)
break;
reqs[idx].len = min(remaind, XFER_BUF_SIZE);
read_bytes = read(fd, reqs[idx].buf, reqs[idx].len);
if (read_bytes < 0) {
pr_err("read from %s failed: %s\n", c->f->src_path, strerrno());
return -1;
}
ret = sftp_async_write(sf, reqs[idx].buf, reqs[idx].len, &reqs[idx].id);
if (ret < 0) {
pr_err("sftp_async_write for %s failed: %d\n",
c->f->dst_path, sftp_get_error(sf->sftp));
return -1;
}
idx = (idx + 1) & nr_ahead;
}
return 0;
}
#endif
static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_file sf, static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_file sf,
int nr_ahead, size_t *counter) int nr_ahead, size_t *counter)
{ {
#define XFER_BUF_SIZE 16384
ssize_t read_bytes, write_bytes, remaind, thrown; ssize_t read_bytes, write_bytes, remaind, thrown;
char buf[XFER_BUF_SIZE]; char buf[XFER_BUF_SIZE];
int idx; int idx;
@@ -725,7 +794,7 @@ static int chunk_copy_internal_remote_to_local(struct chunk *c, int fd, sftp_fil
static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp, static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp,
size_t sftp_buf_sz, size_t io_buf_sz, size_t sftp_buf_sz, size_t io_buf_sz,
size_t *counter) int nr_ahead, size_t *counter)
{ {
struct file *f = c->f; struct file *f = c->f;
sftp_file sf = NULL; sftp_file sf = NULL;
@@ -748,8 +817,12 @@ static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp,
goto out; goto out;
} }
#ifndef ASYNC_WRITE
ret = chunk_copy_internal_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz, ret = chunk_copy_internal_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz,
counter); counter);
#else
ret = chunk_copy_internal_local_to_remote_async(c, fd, sf, nr_ahead, counter);
#endif
if (ret < 0) if (ret < 0)
goto out; goto out;
@@ -823,8 +896,8 @@ int chunk_copy(struct chunk *c, sftp_session sftp, size_t sftp_buf_sz, size_t io
if (f->dst_is_remote) if (f->dst_is_remote)
ret = chunk_copy_local_to_remote(c, sftp, ret = chunk_copy_local_to_remote(c, sftp, sftp_buf_sz, io_buf_sz,
sftp_buf_sz, io_buf_sz, counter); nr_ahead, counter);
else else
ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, counter); ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, counter);

View File

@@ -3,8 +3,8 @@
#include <limits.h> #include <limits.h>
#include <pthread.h> #include <pthread.h>
#include <libssh/libssh.h> #include "libssh/libssh.h"
#include <libssh/sftp.h> #include "libssh/sftp.h"
#include <list.h> #include <list.h>
#include <atomic.h> #include <atomic.h>

View File

@@ -2,8 +2,8 @@
#define _SSH_H_ #define _SSH_H_
#include <stdbool.h> #include <stdbool.h>
#include <libssh/libssh.h> #include "libssh/libssh.h"
#include <libssh/sftp.h> #include "libssh/sftp.h"
struct ssh_opts { struct ssh_opts {