25 Commits

Author SHA1 Message Date
Ryo Nakamura
a8db569fbd bump version to 0.0.5 and update README 2022-12-11 14:20:13 +09:00
Ryo Nakamura
3d98451bba set default nr_threads to floor(log(cores) * 2) + 1)
This change prevents mscp from establishing too many ssh connections
on many-core machines in default.
2022-12-11 14:01:52 +09:00
Ryo Nakamura
d27db01d8d use pthread_cleanup to acquire and release lock
In chunk_prepare(), if multiple threads wait for acquiring f->lock,
and then pthread_cancel() is called, the waiting threads are never
canceled because pthread_mutex_lock() is not a cancellation point.
So, use pthread_cleanup_push/pop to release the lock.
2022-12-11 13:23:41 +09:00
Ryo Nakamura
45cde99a85 allocate headroom for SFTP header
This commit makes ssh_buffer_new_size() can insert headroom. This
headroom can eliminate memcpy involved in ssh_buffer_prepend_data()
for inserting SFTP common header.
2022-12-10 21:48:24 +09:00
Ryo Nakamura
6ae3f0f9f1 set default NR_AHEAD to 32 2022-12-08 18:01:50 +09:00
Ryo Nakamura
847c80276a fix final progress output 2022-12-06 20:04:04 +09:00
Ryo Nakamura
c4ea9a1e78 add ssh_buffer_new_size and ssh_buffer_add_func to libssh
sftp_async_write() with these functions reduces
  1. realloc_buffer by ssh_buffer_new_size()
  2. memcpy from read data to ssh buffer by ssh_buffer_add_func()
2022-12-06 15:02:14 +09:00
Ryo Nakamura
289293e812 change prompt for ssh key passphrase 2022-12-05 22:27:53 +09:00
Ryo Nakamura
1441873db6 reuse ctrl sftp session for the first copy thread 2022-12-05 21:47:00 +09:00
Ryo Nakamura
a2caa93d2a update libssh build options 2022-12-05 19:46:02 +09:00
Ryo Nakamura
e1d14623f4 set TCP_NODELAY by default and introduce -N option to disable it 2022-12-04 21:32:48 +09:00
Ryo Nakamura
3b794ab51b remove unused code and introduce -b buf_sz option
This commit removes ifdef ASYNC_WRITE. So, mscp always depends on
the patched libssh.
2022-12-03 20:48:43 +09:00
Ryo Nakamura
50c6781811 little cleanup 2022-12-02 23:35:45 +09:00
Ryo Nakamura
5846c6b6a9 cache passphrase for private key for later connections. 2022-12-02 23:20:23 +09:00
Ryo Nakamura
03a3a6dc4b add auth callback for input passphrase of privkey 2022-12-02 22:28:56 +09:00
Ryo Nakamura
03b857b51a add -M hmac option 2022-12-02 21:13:13 +09:00
Ryo Nakamura
d646fc1f89 use sigalrm for printing progress bar 2022-11-28 00:14:05 +09:00
Ryo Nakamura
5188cf6df6 add ETA to progress print 2022-11-27 20:36:24 +09:00
Ryo Nakamura
130e735e65 skip sftp_free() inappropriately 2022-11-27 19:48:13 +09:00
Ryo Nakamura
e3ed4f89d2 update REAMDE 2022-11-27 00:37:07 +09:00
Ryo Nakamura
db1431ed6a only the last thread changes dst file permission 2022-11-27 00:06:39 +09:00
Ryo Nakamura
bf3ee25bae add libssh-0.9.6.patch 2022-11-26 23:16:56 +09:00
Ryo Nakamura
8cc964ca8a fix duplicate error message on ssh auth failed.
and fix the final \n with -q
2022-11-26 17:34:24 +09:00
Ryo Nakamura
e0fe88c9c4 update README for v0.0.4 2022-11-26 01:30:25 +09:00
Ryo Nakamura
73cfee29aa bump version to 0.0.4
mistake for v0.0.3 releasing...
2022-11-26 00:48:24 +09:00
11 changed files with 778 additions and 431 deletions

View File

@@ -40,8 +40,6 @@ list(APPEND MSCP_LINK_LIBS ${OPENSSL_LIBRARIES})
find_package(ZLIB)
list(APPEND MSCP_LINK_LIBS ${ZLIB_LIBRARIES})
target_compile_definitions(mscp PUBLIC ASYNC_WRITE=1)
target_include_directories(mscp PRIVATE ${MSCP_INCLUDE_DIRS})
target_link_directories(mscp PRIVATE ${MSCP_LINK_DIRS})
target_link_libraries(mscp PRIVATE ${MSCP_LINK_LIBS})

103
README.md
View File

@@ -11,16 +11,17 @@ over networks.
You can use `mscp` like `scp`, for example, `mscp
user@example.com:srcfile /tmp/dstfile`. Remote hosts only need to run
standard `sshd` supporting the SFTP subsystem, and you need to be able
to ssh to the hosts (as usual). `mscp` does not require anything else.
standard `sshd` supporting the SFTP subsystem (e.g. openssh-server),
and you need to be able to ssh to the hosts as usual. `mscp` does not
require anything else.
Differences from `scp`:
Differences from `scp` on usage:
- remote glob on remote shell expansion is not supported.
- remote to remote copy is not supported.
- `-r` option is not needed.
- and any other differences I have not implemented and noticed...
- and any other differences I have not implemented and noticed.
## Install
@@ -47,18 +48,18 @@ async](https://archive.libssh.org/libssh/2020-06/0000004.html)).
Currently macOS and Linux (Ubuntu, CentOS, Rocky) are supported.
```console
# 1. clone this repository
# clone this repository
git clone https://github.com/upa/mscp.git
cd mscp
# 2. prepare patched libssh
# prepare patched libssh
git submodule update --init
patch -d libssh -p1 < patch/libssh-0.10.4.patch
# 3. install build dependency
# install build dependency
bash ./scripts/install-build-deps.sh
# 4. configure mscp
# configure mscp
mkdir build && mv build
cmake ..
@@ -78,75 +79,77 @@ make install
```console
$ mscp
mscp v0.0.2: copy files over multiple ssh connections
mscp v0.0.5: copy files over multiple ssh connections
Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]
Usage: mscp [vqDCHdNh] [-n nr_conns] [-m coremask]
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
[-l login_name] [-p port] [-i identity_file]
[-c cipher_spec] source ... target
[-c cipher_spec] [-M hmac_spec] source ... target
```
- Example: copy an 8GB file on tmpfs over a 100Gbps link
- Example: copy a 15GB file on memory over a 100Gbps link
- Two Intel Xeon Gold 6130 machines directly connected with Intel E810 100Gbps NICs.
- Default `openssh-server` runs on the remote host.
```console
$ mscp /tmp/test.img 10.0.0.1:/tmp/
[=====================================================] 100% 8GB/8GB 3.02GB/s
$ mscp /var/ram/test.img 10.0.0.1:/var/ram/
[======================================] 100% 15GB/15GB 1.7GB/s 00:00 ETA
```
```console
# with some optimizations. top speed reaches 3.0GB/s.
$ mscp -n 5 -m 0x1f -c aes128-gcm@openssh.com /var/ram/test.img 10.0.0.1:/var/ram/
[======================================] 100% 15GB/15GB 2.4GB/s 00:00 ETA
```
- `-v` option increments verbose output level.
```console
$ mscp test 10.0.0.1:
[=====================================================] 100% 13B/13B 2.41KB/s
[======================================] 100% 26B /26B 6.3KB/s 00:00 ETA
```
$ mscp -v test 10.0.0.1:
file test/test.txt (local) -> ./test/test.txt (remote) 9B
file test/test2/2.txt (local) -> ./test/test2/2.txt (remote) 2B
file test/1.txt (local) -> ./test/1.txt (remote) 2B
copy start: test/test.txt
copy start: test/1.txt
copy start: test/test2/2.txt
copy done: test/1.txt
copy done: test/test2/2.txt
copy done: test/test.txt
[=====================================================] 100% 13B/13B 2.51KB/s
$ mscp -vv -n 4 test 10.0.0.1:
```console
$ mscp -vv test 10.0.0.1:
number of connections: 7
connecting to 10.0.0.1 for checking destinations...
file test/test.txt (local) -> ./test/test.txt (remote) 9B
file test/test2/2.txt (local) -> ./test/test2/2.txt (remote) 2B
file test/1.txt (local) -> ./test/1.txt (remote) 2B
file test/testdir/asdf (local) -> ./test/testdir/asdf (remote) 9B
file test/testdir/qwer (local) -> ./test/testdir/qwer (remote) 5B
file test/test1 (local) -> ./test/test1 (remote) 6B
file test/test2 (local) -> ./test/test2 (remote) 6B
we have only 4 chunk(s). set number of connections to 4
connecting to 10.0.0.1 for a copy thread...
connecting to 10.0.0.1 for a copy thread...
connecting to 10.0.0.1 for a copy thread...
connecting to 10.0.0.1 for a copy thread...
copy start: test/test.txt
copy start: test/1.txt
copy start: test/test2/2.txt
copy done: test/test.txt
copy done: test/test2/2.txt
copy done: test/1.txt
[=====================================================] 100% 13B/13B 3.27KB/s
copy start: test/test1
copy start: test/test2
copy done: test/test1
copy start: test/testdir/asdf
copy done: test/test2
copy start: test/testdir/qwer
copy done: test/testdir/qwer
copy done: test/testdir/asdf
[======================================] 100% 26B /26B 5.2KB/s 00:00 ETA
```
- Full usage
```console
$ mscp -h
mscp v0.0.2: copy files over multiple ssh connections
mscp v0.0.5: copy files over multiple ssh connections
Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]
Usage: mscp [vqDCHdNh] [-n nr_conns] [-m coremask]
[-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]
[-l login_name] [-p port] [-i identity_file]
[-c cipher_spec] source ... target
[-c cipher_spec] [-M hmac_spec] source ... target
-n NR_CONNECTIONS number of connections (default: half of # of cpu cores)
-n NR_CONNECTIONS number of connections (default: floor(log(cores)*2)+1)
-m COREMASK hex value to specify cores where threads pinned
-s MIN_CHUNK_SIZE min chunk size (default: 64MB)
-S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)
-S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)
-a NR_AHEAD number of inflight SFTP commands (default: 16)
-a NR_AHEAD number of inflight SFTP commands (default: 32)
-b BUF_SZ buffer size for i/o and transfer
-v increment verbose output level
-q disable output
@@ -155,15 +158,15 @@ Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]
-l LOGIN_NAME login name
-p PORT port number
-i IDENTITY identity file for public key authentication
-c CIPHER cipher spec, see `ssh -Q cipher`
-c CIPHER cipher spec
-M HMAC hmac spec
-C enable compression on libssh
-H disable hostkey check
-d increment ssh debug output level
-N disable tcp nodelay (default on)
-h print this help
```
Note: mscp is still under development, and the author is not
responsible for any accidents due to mscp.

View File

@@ -1 +1 @@
0.0.2
0.0.5

View File

@@ -1,8 +1,24 @@
diff --git a/DefineOptions.cmake b/DefineOptions.cmake
index 068db988..6db0fc0f 100644
index 068db988..5fc3c8fc 100644
--- a/DefineOptions.cmake
+++ b/DefineOptions.cmake
@@ -17,7 +17,7 @@ option(UNIT_TESTING "Build with unit tests" OFF)
@@ -1,7 +1,7 @@
option(WITH_GSSAPI "Build with GSSAPI support" ON)
option(WITH_ZLIB "Build with ZLIB support" ON)
option(WITH_SFTP "Build with SFTP support" ON)
-option(WITH_SERVER "Build with SSH server support" ON)
+option(WITH_SERVER "Build with SSH server support" OFF)
option(WITH_DEBUG_CRYPTO "Build with cryto debug output" OFF)
option(WITH_DEBUG_PACKET "Build with packet debug output" OFF)
option(WITH_DEBUG_CALLTRACE "Build with calltrace debug output" ON)
@@ -11,13 +11,13 @@ option(WITH_MBEDTLS "Compile against libmbedtls" OFF)
option(WITH_BLOWFISH_CIPHER "Compile with blowfish support" OFF)
option(WITH_PCAP "Compile with Pcap generation support" ON)
option(WITH_INTERNAL_DOC "Compile doxygen internal documentation" OFF)
-option(BUILD_SHARED_LIBS "Build shared libraries" ON)
+option(BUILD_SHARED_LIBS "Build shared libraries" OFF)
option(WITH_PKCS11_URI "Build with PKCS#11 URI support" OFF)
option(UNIT_TESTING "Build with unit tests" OFF)
option(CLIENT_TESTING "Build with client tests; requires openssh" OFF)
option(SERVER_TESTING "Build with server tests; requires openssh and dropbear" OFF)
option(WITH_BENCHMARKS "Build benchmarks tools" OFF)
@@ -27,15 +43,50 @@ index 068db988..6db0fc0f 100644
+if (WITH_STATIC_LIB)
+ set(BUILD_STATIC_LIB ON)
+endif()
diff --git a/include/libssh/buffer.h b/include/libssh/buffer.h
index a55a1b40..e34e075c 100644
--- a/include/libssh/buffer.h
+++ b/include/libssh/buffer.h
@@ -33,6 +33,8 @@ int ssh_buffer_add_u8(ssh_buffer buffer, uint8_t data);
int ssh_buffer_add_u16(ssh_buffer buffer, uint16_t data);
int ssh_buffer_add_u32(ssh_buffer buffer, uint32_t data);
int ssh_buffer_add_u64(ssh_buffer buffer, uint64_t data);
+ssize_t ssh_buffer_add_func(ssh_buffer buffer, ssh_add_func f, size_t max_bytes,
+ void *userdata);
int ssh_buffer_validate_length(struct ssh_buffer_struct *buffer, size_t len);
diff --git a/include/libssh/libssh.h b/include/libssh/libssh.h
index 7857a77b..3eef7a16 100644
--- a/include/libssh/libssh.h
+++ b/include/libssh/libssh.h
@@ -833,6 +833,7 @@ LIBSSH_API const char* ssh_get_hmac_in(ssh_session session);
LIBSSH_API const char* ssh_get_hmac_out(ssh_session session);
LIBSSH_API ssh_buffer ssh_buffer_new(void);
+LIBSSH_API ssh_buffer ssh_buffer_new_size(uint32_t size, uint32_t headroom);
LIBSSH_API void ssh_buffer_free(ssh_buffer buffer);
#define SSH_BUFFER_FREE(x) \
do { if ((x) != NULL) { ssh_buffer_free(x); x = NULL; } } while(0)
@@ -843,6 +844,8 @@ LIBSSH_API void *ssh_buffer_get(ssh_buffer buffer);
LIBSSH_API uint32_t ssh_buffer_get_len(ssh_buffer buffer);
LIBSSH_API int ssh_session_set_disconnect_message(ssh_session session, const char *message);
+typedef ssize_t (*ssh_add_func) (void *ptr, size_t max_bytes, void *userdata);
+
#ifndef LIBSSH_LEGACY_0_4
#include "libssh/legacy.h"
#endif
diff --git a/include/libssh/sftp.h b/include/libssh/sftp.h
index c855df8a..1fd1710a 100644
index c855df8a..0fcdb9b8 100644
--- a/include/libssh/sftp.h
+++ b/include/libssh/sftp.h
@@ -565,6 +565,9 @@ LIBSSH_API int sftp_async_read(sftp_file file, void *data, uint32_t len, uint32_
@@ -565,6 +565,10 @@ LIBSSH_API int sftp_async_read(sftp_file file, void *data, uint32_t len, uint32_
*/
LIBSSH_API ssize_t sftp_write(sftp_file file, const void *buf, size_t count);
+LIBSSH_API int sftp_async_write(sftp_file file, const void *buf, size_t count, uint32_t* id);
+LIBSSH_API ssize_t sftp_async_write(sftp_file file, ssh_add_func f, size_t count,
+ void *userdata, uint32_t* id);
+LIBSSH_API int sftp_async_write_end(sftp_file file, uint32_t id, int blocking);
+
/**
@@ -57,26 +108,143 @@ index c090fef7..e2f86309 100644
endif (BUILD_STATIC_LIB)
message(STATUS "Threads_FOUND=${Threads_FOUND}")
diff --git a/src/buffer.c b/src/buffer.c
index e0068015..cc0caf35 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -141,6 +141,40 @@ struct ssh_buffer_struct *ssh_buffer_new(void)
return buf;
}
+/**
+ * @brief Create a new SSH buffer with a specified size and headroom.
+ *
+ * @param[in] len length for newly initialized SSH buffer.
+ * @param[in] headroom length for headroom
+ * @return A newly initialized SSH buffer, NULL on error.
+ */
+struct ssh_buffer_struct *ssh_buffer_new_size(uint32_t len, uint32_t headroom)
+{
+ struct ssh_buffer_struct *buf = NULL;
+ int rc;
+
+ if (len < headroom)
+ return NULL;
+
+ buf = calloc(1, sizeof(struct ssh_buffer_struct));
+ if (buf == NULL) {
+ return NULL;
+ }
+
+ rc = ssh_buffer_allocate_size(buf, len);
+ if (rc != 0) {
+ SAFE_FREE(buf);
+ return NULL;
+ }
+
+ buf->pos += headroom;
+ buf->used += headroom;
+
+ buffer_verify(buf);
+
+ return buf;
+}
+
/**
* @brief Deallocate a SSH buffer.
*
@@ -328,6 +362,49 @@ int ssh_buffer_add_data(struct ssh_buffer_struct *buffer, const void *data, uint
return 0;
}
+/**
+ * @brief Add data at the tail of a buffer by an external function
+ *
+ * @param[in] buffer The buffer to add data.
+ *
+ * @param[in] f function that adds data to the buffer.
+ *
+ * @param[in] max_bytes The maximum length of the data to add.
+ *
+ * @return actual bytes added on success, < 0 on error.
+ */
+ssize_t ssh_buffer_add_func(struct ssh_buffer_struct *buffer, ssh_add_func f,
+ size_t max_bytes, void *userdata)
+{
+ ssize_t actual;
+
+ if (buffer == NULL) {
+ return -1;
+ }
+
+ buffer_verify(buffer);
+
+ if (buffer->used + max_bytes < max_bytes) {
+ return -1;
+ }
+
+ if (buffer->allocated < (buffer->used + max_bytes)) {
+ if (buffer->pos > 0) {
+ buffer_shift(buffer);
+ }
+ if (realloc_buffer(buffer, buffer->used + max_bytes) < 0) {
+ return -1;
+ }
+ }
+
+ if ((actual = f(buffer->data + buffer->used, max_bytes, userdata)) < 0)
+ return -1;
+
+ buffer->used += actual;
+ buffer_verify(buffer);
+ return actual;
+}
+
/**
* @brief Ensure the buffer has at least a certain preallocated size.
*
diff --git a/src/sftp.c b/src/sftp.c
index e01012a8..7b5dc249 100644
index e01012a8..702623a0 100644
--- a/src/sftp.c
+++ b/src/sftp.c
@@ -2228,6 +2228,102 @@ ssize_t sftp_write(sftp_file file, const void *buf, size_t count) {
@@ -2228,6 +2228,132 @@ ssize_t sftp_write(sftp_file file, const void *buf, size_t count) {
return -1; /* not reached */
}
+/*
+ * sftp_async_write and sftp_async_write_end are copied from
+ * sftp_async_write is based on and sftp_async_write_end is copied from
+ * https://github.com/limes-datentechnik-gmbh/libssh
+ *
+ * sftp_async_write has some optimizations:
+ * - use ssh_buffer_new_size() to reduce realoc_buffer.
+ * - use ssh_buffer_add_func() to avoid memcpy from read buffer to ssh buffer.
+ */
+int sftp_async_write(sftp_file file, const void *buf, size_t count, uint32_t* id) {
+ssize_t sftp_async_write(sftp_file file, ssh_add_func f, size_t count, void *userdata,
+ uint32_t* id) {
+ sftp_session sftp = file->sftp;
+ ssh_buffer buffer;
+ uint32_t buf_sz;
+ ssize_t actual;
+ int len;
+ int packetlen;
+ int rc;
+
+ buffer = ssh_buffer_new();
+#define HEADROOM 16
+ /* sftp_packet_write() prepends a 5-bytes (uint32_t length and
+ * 1-byte type) header to the head of the payload by
+ * ssh_buffer_prepend_data(). Inserting headroom by
+ * ssh_buffer_new_size() eliminates memcpy for prepending the
+ * header.
+ */
+
+ buf_sz = (HEADROOM + /* for header */
+ sizeof(uint32_t) + /* id */
+ ssh_string_len(file->handle) + 4 + /* file->handle */
+ sizeof(uint64_t) + /* file->offset */
+ sizeof(uint32_t) + /* count */
+ count); /* datastring */
+
+ buffer = ssh_buffer_new_size(buf_sz, HEADROOM);
+ if (buffer == NULL) {
+ ssh_set_error_oom(sftp->session);
+ return -1;
@@ -85,17 +253,25 @@ index e01012a8..7b5dc249 100644
+ *id = sftp_get_new_id(file->sftp);
+
+ rc = ssh_buffer_pack(buffer,
+ "dSqdP",
+ "dSqd",
+ *id,
+ file->handle,
+ file->offset,
+ count, /* len of datastring */
+ (size_t)count, buf);
+ count); /* len of datastring */
+
+ if (rc != SSH_OK){
+ ssh_set_error_oom(sftp->session);
+ ssh_buffer_free(buffer);
+ return SSH_ERROR;
+ }
+
+ actual = ssh_buffer_add_func(buffer, f, count, userdata);
+ if (actual < 0){
+ ssh_set_error_oom(sftp->session);
+ ssh_buffer_free(buffer);
+ return SSH_ERROR;
+ }
+
+ packetlen=ssh_buffer_get_len(buffer)+5;
+ len = sftp_packet_write(file->sftp, SSH_FXP_WRITE, buffer);
+ ssh_buffer_free(buffer);
@@ -109,9 +285,9 @@ index e01012a8..7b5dc249 100644
+ return SSH_ERROR;
+ }
+
+ file->offset += count;
+ file->offset += actual;
+
+ return SSH_OK;
+ return actual;
+}
+
+int sftp_async_write_end(sftp_file file, uint32_t id, int blocking) {

164
patch/libssh-0.9.6.patch Normal file
View File

@@ -0,0 +1,164 @@
diff --git a/DefineOptions.cmake b/DefineOptions.cmake
index b82a5018..f1f2ab9d 100644
--- a/DefineOptions.cmake
+++ b/DefineOptions.cmake
@@ -15,13 +15,14 @@ option(UNIT_TESTING "Build with unit tests" OFF)
option(CLIENT_TESTING "Build with client tests; requires openssh" OFF)
option(SERVER_TESTING "Build with server tests; requires openssh and dropbear" OFF)
option(WITH_BENCHMARKS "Build benchmarks tools" OFF)
-option(WITH_EXAMPLES "Build examples" ON)
+option(WITH_EXAMPLES "Build examples" OFF)
option(WITH_NACL "Build with libnacl (curve25519)" ON)
option(WITH_SYMBOL_VERSIONING "Build with symbol versioning" ON)
option(WITH_ABI_BREAK "Allow ABI break" OFF)
option(WITH_GEX "Enable DH Group exchange mechanisms" ON)
option(FUZZ_TESTING "Build with fuzzer for the server" OFF)
option(PICKY_DEVELOPER "Build with picky developer flags" OFF)
+option(WITH_STATIC_LIB "Build static library" ON)
if (WITH_ZLIB)
set(WITH_LIBZ ON)
@@ -53,3 +54,7 @@ endif (NOT GLOBAL_BIND_CONFIG)
if (NOT GLOBAL_CLIENT_CONFIG)
set(GLOBAL_CLIENT_CONFIG "/etc/ssh/ssh_config")
endif (NOT GLOBAL_CLIENT_CONFIG)
+
+if (WITH_STATIC_LIB)
+ set(BUILD_STATIC_LIB ON)
+endif()
diff --git a/include/libssh/sftp.h b/include/libssh/sftp.h
index 8c14b21d..95ac1d6b 100644
--- a/include/libssh/sftp.h
+++ b/include/libssh/sftp.h
@@ -565,6 +565,9 @@ LIBSSH_API int sftp_async_read(sftp_file file, void *data, uint32_t len, uint32_
*/
LIBSSH_API ssize_t sftp_write(sftp_file file, const void *buf, size_t count);
+LIBSSH_API int sftp_async_write(sftp_file file, const void *buf, size_t count, uint32_t* id);
+LIBSSH_API int sftp_async_write_end(sftp_file file, uint32_t id, int blocking);
+
/**
* @brief Seek to a specific location in a file.
*
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index a576cf71..303a1c7f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -412,6 +412,10 @@ if (BUILD_STATIC_LIB)
if (WIN32)
target_compile_definitions(ssh-static PUBLIC "LIBSSH_STATIC")
endif (WIN32)
+ install(TARGETS ssh-static
+ EXPORT libssh-config
+ LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
+ COMPONENT libraries)
endif (BUILD_STATIC_LIB)
message(STATUS "Threads_FOUND=${Threads_FOUND}")
diff --git a/src/sftp.c b/src/sftp.c
index a8346040..a4261ec9 100644
--- a/src/sftp.c
+++ b/src/sftp.c
@@ -2234,6 +2234,102 @@ ssize_t sftp_write(sftp_file file, const void *buf, size_t count) {
return -1; /* not reached */
}
+/*
+ * sftp_async_write and sftp_async_write_end are copied from
+ * https://github.com/limes-datentechnik-gmbh/libssh
+ */
+int sftp_async_write(sftp_file file, const void *buf, size_t count, uint32_t* id) {
+ sftp_session sftp = file->sftp;
+ ssh_buffer buffer;
+ int len;
+ int packetlen;
+ int rc;
+
+ buffer = ssh_buffer_new();
+ if (buffer == NULL) {
+ ssh_set_error_oom(sftp->session);
+ return -1;
+ }
+
+ *id = sftp_get_new_id(file->sftp);
+
+ rc = ssh_buffer_pack(buffer,
+ "dSqdP",
+ *id,
+ file->handle,
+ file->offset,
+ count, /* len of datastring */
+ (size_t)count, buf);
+ if (rc != SSH_OK){
+ ssh_set_error_oom(sftp->session);
+ ssh_buffer_free(buffer);
+ return SSH_ERROR;
+ }
+ packetlen=ssh_buffer_get_len(buffer)+5;
+ len = sftp_packet_write(file->sftp, SSH_FXP_WRITE, buffer);
+ ssh_buffer_free(buffer);
+ if (len < 0) {
+ return SSH_ERROR;
+ } else if (len != packetlen) {
+ ssh_set_error(sftp->session, SSH_FATAL,
+ "Could only send %d of %d bytes to remote host!", len, packetlen);
+ SSH_LOG(SSH_LOG_PACKET,
+ "Could not write as much data as expected");
+ return SSH_ERROR;
+ }
+
+ file->offset += count;
+
+ return SSH_OK;
+}
+
+int sftp_async_write_end(sftp_file file, uint32_t id, int blocking) {
+ sftp_session sftp = file->sftp;
+ sftp_message msg = NULL;
+ sftp_status_message status;
+
+ msg = sftp_dequeue(sftp, id);
+ while (msg == NULL) {
+ if (!blocking && ssh_channel_poll(sftp->channel, 0) == 0) {
+ /* we cannot block */
+ return SSH_AGAIN;
+ }
+ if (sftp_read_and_dispatch(sftp) < 0) {
+ /* something nasty has happened */
+ return SSH_ERROR;
+ }
+ msg = sftp_dequeue(sftp, id);
+ }
+
+ switch (msg->packet_type) {
+ case SSH_FXP_STATUS:
+ status = parse_status_msg(msg);
+ sftp_message_free(msg);
+ if (status == NULL) {
+ return SSH_ERROR;
+ }
+ sftp_set_error(sftp, status->status);
+ switch (status->status) {
+ case SSH_FX_OK:
+ status_msg_free(status);
+ return SSH_OK;
+ default:
+ break;
+ }
+ ssh_set_error(sftp->session, SSH_REQUEST_DENIED,
+ "SFTP server: %s", status->errormsg);
+ status_msg_free(status);
+ return SSH_ERROR;
+ default:
+ ssh_set_error(sftp->session, SSH_FATAL,
+ "Received message %d during write!", msg->packet_type);
+ sftp_message_free(msg);
+ return SSH_ERROR;
+ }
+
+ return SSH_ERROR; /* not reached */
+}
+
/* Seek to a specific location in a file. */
int sftp_seek(sftp_file file, uint32_t new_offset) {
if (file == NULL) {

View File

@@ -55,4 +55,17 @@ static inline void lock_release(lock *l)
}
}
static inline void lock_release_via_cleanup(void *l)
{
lock_release(l);
}
#define LOCK_ACQUIRE_THREAD(l) \
lock_acquire(l); \
pthread_cleanup_push(lock_release_via_cleanup, l)
#define LOCK_RELEASE_THREAD(l) \
pthread_cleanup_pop(1)
#endif /* _ATOMIC_H_ */

View File

@@ -492,7 +492,7 @@ int chunk_fill(struct list_head *file_list, struct list_head *chunk_list,
chunk_sz = max_chunk_sz;
else {
chunk_sz = (f->size - (f->size % nr_conn)) / nr_conn;
chunk_sz &= ~page_mask; /* align in page_sz */
chunk_sz &= ~page_mask; /* align with page_sz */
if (chunk_sz <= min_chunk_sz)
chunk_sz = min_chunk_sz;
}
@@ -554,7 +554,7 @@ int chunk_prepare(struct chunk *c, sftp_session sftp)
struct file *f = c->f;
int ret = 0;
lock_acquire(&f->lock); /* XXX: is always acquiring lock per-chunk heavy? */
LOCK_ACQUIRE_THREAD(&f->lock);
if (f->state == FILE_STATE_INIT) {
if (file_dst_prepare(f, f->dst_is_remote ? sftp : NULL) < 0) {
ret = -1;
@@ -565,11 +565,11 @@ int chunk_prepare(struct chunk *c, sftp_session sftp)
}
out:
lock_release(&f->lock);
LOCK_RELEASE_THREAD();
return ret;
}
static mode_t chunk_get_mode(const char *path, sftp_session sftp)
static mode_t file_get_mode(const char *path, sftp_session sftp)
{
mode_t mode;
@@ -592,7 +592,7 @@ static mode_t chunk_get_mode(const char *path, sftp_session sftp)
return mode;
}
static int chunk_set_mode(const char *path, mode_t mode, sftp_session sftp)
static int file_set_mode(const char *path, mode_t mode, sftp_session sftp)
{
if (sftp) {
if (sftp_chmod(sftp, path, mode) < 0) {
@@ -647,50 +647,16 @@ static sftp_file chunk_open_remote(const char *path, int flags, mode_t mode, siz
return sf;
}
/*
* TODO: handle case when read returns 0 (EOF).
*/
static int _chunk_copy_local_to_remote(struct chunk *c, int fd, sftp_file sf,
size_t sftp_buf_sz, size_t io_buf_sz,
size_t *counter)
static ssize_t read_to_buf(void *ptr, size_t len, void *userdata)
{
ssize_t read_bytes, write_bytes, remaind;
char buf[io_buf_sz];
for (remaind = c->len; remaind > 0;) {
read_bytes = read(fd, buf, min(remaind, io_buf_sz));
if (read_bytes < 0) {
pr_err("read: %s\n", strerrno());
return -1;
}
write_bytes = sftp_write2(sf, buf, read_bytes, sftp_buf_sz);
if (write_bytes < 0) {
pr_err("sftp_write: %d\n", sftp_get_error(sf->sftp));
return -1;
}
if (write_bytes < read_bytes) {
pr_err("failed to write full bytes to %s\n", c->f->dst_path);
return -1;
}
*counter += write_bytes;
remaind -= write_bytes;
}
return 0;
int fd = *((int *)userdata);
return read(fd, ptr, len);
}
#define XFER_BUF_SIZE 16384
#ifdef ASYNC_WRITE
static int _chunk_copy_local_to_remote_async(struct chunk *c, int fd,
sftp_file sf, int nr_ahead, size_t *counter)
static int chunk_copy_local_to_remote_async(struct chunk *c, int fd, sftp_file sf,
int nr_ahead, int buf_sz, size_t *counter)
{
ssize_t read_bytes, remaind, thrown;
char buf[XFER_BUF_SIZE];
int idx, ret;
struct {
uint32_t id;
@@ -702,15 +668,12 @@ static int _chunk_copy_local_to_remote_async(struct chunk *c, int fd,
remaind = thrown = c->len;
for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
reqs[idx].len = min(thrown, sizeof(buf));
read_bytes = read(fd, buf, reqs[idx].len);
if (read_bytes < 0) {
pr_err("read: %s\n", strerrno());
return -1;
}
ret = sftp_async_write(sf, buf, reqs[idx].len, &reqs[idx].id);
if (ret < 0) {
pr_err("sftp_async_write: %d\n", sftp_get_error(sf->sftp));
reqs[idx].len = min(thrown, buf_sz);
reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
&reqs[idx].id);
if (reqs[idx].len < 0) {
pr_err("sftp_async_write: %d or %s\n",
sftp_get_error(sf->sftp), strerrno());
return -1;
}
thrown -= reqs[idx].len;
@@ -732,51 +695,38 @@ static int _chunk_copy_local_to_remote_async(struct chunk *c, int fd,
if (thrown <= 0)
continue;
reqs[idx].len = min(thrown, sizeof(buf));
read_bytes = read(fd, buf, reqs[idx].len);
if (read_bytes < 0) {
pr_err("read: %s\n", strerrno());
return -1;
}
ret = sftp_async_write(sf, buf, reqs[idx].len, &reqs[idx].id);
if (ret < 0) {
pr_err("sftp_async_write: %d\n", sftp_get_error(sf->sftp));
reqs[idx].len = min(thrown, buf_sz);
reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
&reqs[idx].id);
if (reqs[idx].len < 0) {
pr_err("sftp_async_write: %d or %s\n",
sftp_get_error(sf->sftp), strerrno());
return -1;
}
thrown -= reqs[idx].len;
}
if (remaind < 0) {
pr_err("invalid remaind bytes %ld. last async_write_end bytes %lu. "
"last read bytes %ld\n",
remaind, reqs[idx].len, read_bytes);
pr_err("invalid remaind bytes %ld. last async_write_end bytes %lu.",
remaind, reqs[idx].len);
return -1;
}
return 0;
}
#endif
static int _chunk_copy_remote_to_local(struct chunk *c, int fd, sftp_file sf,
int nr_ahead, size_t *counter)
static int chunk_copy_remote_to_local_async(struct chunk *c, int fd, sftp_file sf,
int nr_ahead, int buf_sz, size_t *counter)
{
ssize_t read_bytes, write_bytes, remaind, thrown;
char buf[XFER_BUF_SIZE];
char buf[buf_sz];
int idx;
struct {
int id;
ssize_t len;
} reqs[nr_ahead];
/* TODO: sftp_buf_sz has no effect on remote to local copy. we
* always use 16384 byte buffer pointed by
* https://api.libssh.org/stable/libssh_tutor_sftp.html. The
* larget read length from sftp_async_read is 65536 byte.
* Read sizes larget than 65536 cause a situation where data
* remainds but sftp_async_read returns 0.
*/
if (c->len == 0)
return 0;
@@ -832,8 +782,7 @@ static int _chunk_copy_remote_to_local(struct chunk *c, int fd, sftp_file sf,
}
static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp,
size_t sftp_buf_sz, size_t io_buf_sz,
int nr_ahead, size_t *counter)
int nr_ahead, int buf_sz, size_t *counter)
{
struct file *f = c->f;
sftp_file sf = NULL;
@@ -856,23 +805,9 @@ static int chunk_copy_local_to_remote(struct chunk *c, sftp_session sftp,
goto out;
}
#ifndef ASYNC_WRITE
ret = _chunk_copy_local_to_remote(c, fd, sf, sftp_buf_sz, io_buf_sz,
counter);
#else
ret = _chunk_copy_local_to_remote_async(c, fd, sf, nr_ahead, counter);
#endif
ret = chunk_copy_local_to_remote_async(c, fd, sf, nr_ahead, buf_sz, counter);
if (ret < 0)
goto out;
if ((mode = chunk_get_mode(f->src_path, NULL)) < 0) {
ret = -1;
goto out;
}
if (chunk_set_mode(f->dst_path, mode, sftp) < 0) {
ret = -1;
}
out:
if (fd > 0)
close(fd);
@@ -882,7 +817,7 @@ out:
}
static int chunk_copy_remote_to_local(struct chunk *c, sftp_session sftp,
int nr_ahead, size_t *counter)
int nr_ahead, int buf_sz, size_t *counter)
{
struct file *f = c->f;
sftp_file sf = NULL;
@@ -905,7 +840,7 @@ static int chunk_copy_remote_to_local(struct chunk *c, sftp_session sftp,
goto out;
}
ret = _chunk_copy_remote_to_local(c, fd, sf, nr_ahead, counter);
ret = chunk_copy_remote_to_local_async(c, fd, sf, nr_ahead, buf_sz, counter);
if (ret< 0)
goto out;
@@ -918,43 +853,51 @@ out:
return ret;
}
static int file_cleanup(struct file *f, sftp_session sftp)
{
sftp_session s, d;
mode_t mode;
if (f->dst_is_remote) {
s = NULL;
d = sftp;
} else {
s = sftp;
d = NULL;
}
int chunk_copy(struct chunk *c, sftp_session sftp, size_t sftp_buf_sz, size_t io_buf_sz,
int nr_ahead, size_t *counter)
if ((mode = file_get_mode(f->src_path, s)) < 0)
return -1;
if (file_set_mode(f->dst_path, mode, d) < 0)
return -1;
return 0;
}
int chunk_copy(struct chunk *c, sftp_session sftp, int nr_ahead, int buf_sz,
size_t *counter)
{
struct file *f = c->f;
int ret = 0;
pr_debug("copy %s %s -> %s %s off=0x%010lx\n",
f->src_path, strloc(f->src_is_remote),
f->dst_path, strloc(f->dst_is_remote), c->off);
pprint4("copy start: chunk %s 0x%010lx-0x%010lx %luB\n",
c->f->src_path, c->off, c->off + c->len, c->len);
if (f->dst_is_remote)
ret = chunk_copy_local_to_remote(c, sftp, sftp_buf_sz, io_buf_sz,
nr_ahead, counter);
ret = chunk_copy_local_to_remote(c, sftp, nr_ahead, buf_sz, counter);
else
ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, counter);
ret = chunk_copy_remote_to_local(c, sftp, nr_ahead, buf_sz, counter);
if (ret < 0)
return ret;
pr_debug("done %s %s -> %s %s off=0x%010lx\n",
f->src_path, strloc(f->src_is_remote),
f->dst_path, strloc(f->dst_is_remote), c->off);
pprint4("copy done: chunk %s 0x%010lx-0x%010lx %luB\n",
c->f->src_path, c->off, c->off + c->len, c->len);
if (refcnt_dec(&f->refcnt) == 0) {
f->state = FILE_STATE_DONE;
pprint2("copy done: %s\n", f->src_path);
ret = file_cleanup(f, sftp);
}
return ret;
}

View File

@@ -72,8 +72,9 @@ int chunk_fill(struct list_head *file_list, struct list_head *chunk_list,
struct chunk *chunk_acquire(struct list_head *chunk_list);
int chunk_prepare(struct chunk *c, sftp_session sftp);
int chunk_copy(struct chunk *c, sftp_session sftp, size_t sftp_buf_sz, size_t io_buf_sz,
int nr_ahead, size_t *counter);
int chunk_copy(struct chunk *c, sftp_session sftp, int nr_ahead, int buf_sz,
size_t *counter);
#ifdef DEBUG
void file_dump(struct list_head *file_list);

View File

@@ -23,30 +23,18 @@
#endif
#define DEFAULT_MIN_CHUNK_SZ (64 << 20) /* 64MB */
#define DEFAULT_SFTP_BUF_SZ 131072 /* derived from qemu/block/ssh.c */
#define DEFAULT_IO_BUF_SZ DEFAULT_SFTP_BUF_SZ
/* XXX: need to investigate max buf size for sftp_read/sftp_write */
#define DEFAULT_NR_AHEAD 16
#define DEFAULT_NR_AHEAD 32
#define DEFAULT_BUF_SZ 16384
/* XXX: we use 16384 byte buffer pointed by
* https://api.libssh.org/stable/libssh_tutor_sftp.html. The larget
* read length from sftp_async_read is 65536 byte. Read sizes larger
* than 65536 cause a situation where data remainds but
* sftp_async_read returns 0.
*/
struct mscp {
char *host; /* remote host (and username) */
struct ssh_opts *opts; /* ssh parameters */
sftp_session ctrl; /* control sftp session */
struct list_head file_list;
struct list_head chunk_list; /* stack of chunks */
lock chunk_lock; /* lock for chunk list */
char *target;
int sftp_buf_sz, io_buf_sz;
int nr_ahead; /* # of ahead read command for remote to local copy */
struct timeval start; /* timestamp of starting copy */
};
struct mscp_thread {
struct mscp *mscp;
sftp_session sftp;
pthread_t tid;
@@ -56,52 +44,60 @@ struct mscp_thread {
int ret;
};
void *mscp_copy_thread(void *arg);
void *mscp_monitor_thread(void *arg);
struct mscp {
char *host; /* remote host (and username) */
struct ssh_opts *opts; /* ssh parameters */
struct list_head file_list;
struct list_head chunk_list; /* stack of chunks */
lock chunk_lock; /* lock for chunk list */
char *target;
int nr_threads; /* number of threads */
int buf_sz; /* i/o buf size */
int nr_ahead; /* # of ahead read command for remote to local copy */
struct mscp_thread *threads;
} m;
void *mscp_copy_thread(void *arg);
int mscp_stat_init();
void mscp_stat_final();
pthread_t mtid;
struct mscp_thread *threads;
int nr_threads;
void stop_copy_threads(int sig)
{
int n;
pr("stopping...\n");
for (n = 0; n < nr_threads; n++) {
if (!threads[n].finished)
pthread_cancel(threads[n].tid);
for (n = 0; n < m.nr_threads; n++) {
if (m.threads[n].tid && !m.threads[n].finished)
pthread_cancel(m.threads[n].tid);
}
}
void usage(bool print_help) {
printf("mscp v" VERSION ": copy files over multiple ssh connections\n"
"\n"
"Usage: mscp [vqDCHdh] [-n nr_conns] [-m coremask]\n"
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead]\n"
#ifndef ASYNC_WRITE
" [-b sftp_buf_sz] [-B io_buf_sz] \n"
#endif
"Usage: mscp [vqDCHdNh] [-n nr_conns] [-m coremask]\n"
" [-s min_chunk_sz] [-S max_chunk_sz] [-a nr_ahead] [-b buf_sz]\n"
" [-l login_name] [-p port] [-i identity_file]\n"
" [-c cipher_spec] source ... target\n"
" [-c cipher_spec] [-M hmac_spec] source ... target\n"
"\n");
if (!print_help)
return;
printf(" -n NR_CONNECTIONS number of connections (default: half of # of cpu cores)\n"
printf(" -n NR_CONNECTIONS number of connections "
"(default: floor(log(cores)*2)+1)\n"
" -m COREMASK hex value to specify cores where threads pinned\n"
" -s MIN_CHUNK_SIZE min chunk size (default: 64MB)\n"
" -S MAX_CHUNK_SIZE max chunk size (default: filesize / nr_conn)\n"
" -S MAX_CHUNK_SIZE max chunk size (default: filesize/nr_conn)\n"
"\n"
" -a NR_AHEAD number of inflight SFTP commands (default: 16)\n"
#ifndef ASYNC_WRITE
" -b SFTP_BUF_SIZE buf size for sftp_read/write (default 131072B)\n"
" -B IO_BUF_SIZE buf size for read/write (default 131072B)\n"
" Note that the default value is derived from\n"
" qemu/block/ssh.c. need investigation...\n"
" -b and -B affect only local to remote copy\n"
#endif
" -a NR_AHEAD number of inflight SFTP commands (default: 32)\n"
" -b BUF_SZ buffer size for i/o and transfer\n"
"\n"
" -v increment verbose output level\n"
" -q disable output\n"
@@ -110,10 +106,12 @@ void usage(bool print_help) {
" -l LOGIN_NAME login name\n"
" -p PORT port number\n"
" -i IDENTITY identity file for public key authentication\n"
" -c CIPHER cipher spec, see `ssh -Q cipher`\n"
" -c CIPHER cipher spec\n"
" -M HMAC hmac spec\n"
" -C enable compression on libssh\n"
" -H disable hostkey check\n"
" -d increment ssh debug output level\n"
" -N disable tcp nodelay (default on)\n"
" -h print this help\n"
"\n");
}
@@ -167,6 +165,7 @@ int expand_coremask(const char *coremask, int **cores, int *nr_cores)
char c[2] = { 'x', '\0' };
const char *_coremask;
long v, needle;
int ncores = nr_cpus();
/*
* This function returns array of usable cores in `cores` and
@@ -197,6 +196,8 @@ int expand_coremask(const char *coremask, int **cores, int *nr_cores)
for (needle = 0x01; needle < 0x10; needle <<= 1) {
nr_all++;
if (nr_all > ncores)
break; /* too long coremask */
if (v & needle) {
nr_usable++;
core_list = realloc(core_list, sizeof(int) * nr_usable);
@@ -219,10 +220,15 @@ int expand_coremask(const char *coremask, int **cores, int *nr_cores)
return 0;
}
int default_nr_threads()
{
return (int)(floor(log(nr_cpus()) * 2) + 1);
}
int main(int argc, char **argv)
{
struct mscp m;
struct ssh_opts opts;
sftp_session ctrl;
int min_chunk_sz = DEFAULT_MIN_CHUNK_SZ;
int max_chunk_sz = 0;
char *coremask = NULL;;
@@ -233,22 +239,20 @@ int main(int argc, char **argv)
char ch;
memset(&opts, 0, sizeof(opts));
opts.nodelay = 1;
memset(&m, 0, sizeof(m));
INIT_LIST_HEAD(&m.file_list);
INIT_LIST_HEAD(&m.chunk_list);
lock_init(&m.chunk_lock);
m.sftp_buf_sz = DEFAULT_SFTP_BUF_SZ;
m.io_buf_sz = DEFAULT_IO_BUF_SZ;
m.nr_ahead = DEFAULT_NR_AHEAD;
m.buf_sz = DEFAULT_BUF_SZ;
m.nr_threads = default_nr_threads();
nr_threads = (int)(nr_cpus() / 2);
nr_threads = nr_threads == 0 ? 1 : nr_threads;
while ((ch = getopt(argc, argv, "n:m:s:S:b:B:a:vqDl:p:i:c:CHdh")) != -1) {
while ((ch = getopt(argc, argv, "n:m:s:S:a:b:vqDl:p:i:c:M:CHdNh")) != -1) {
switch (ch) {
case 'n':
nr_threads = atoi(optarg);
if (nr_threads < 1) {
m.nr_threads = atoi(optarg);
if (m.nr_threads < 1) {
pr_err("invalid number of connections: %s\n", optarg);
return 1;
}
@@ -286,20 +290,6 @@ int main(int argc, char **argv)
return -1;
}
break;
case 'b':
m.sftp_buf_sz = atoi(optarg);
if (m.sftp_buf_sz < 1) {
pr_err("invalid buffer size: %s\n", optarg);
return -1;
}
break;
case 'B':
m.io_buf_sz = atoi(optarg);
if (m.io_buf_sz < 1) {
pr_err("invalid buffer size: %s\n", optarg);
return -1;
}
break;
case 'a':
m.nr_ahead = atoi(optarg);
if (m.nr_ahead < 1) {
@@ -307,6 +297,13 @@ int main(int argc, char **argv)
return -1;
}
break;
case 'b':
m.buf_sz = atoi(optarg);
if (m.buf_sz < 1) {
pr_err("invalid buffer size: %s\n", optarg);
return -1;
}
break;
case 'v':
verbose++;
break;
@@ -328,6 +325,9 @@ int main(int argc, char **argv)
case 'c':
opts.cipher = optarg;
break;
case 'M':
opts.hmac = optarg;
break;
case 'C':
opts.compress++;
break;
@@ -337,6 +337,9 @@ int main(int argc, char **argv)
case 'd':
opts.debuglevel++;
break;
case 'N':
opts.nodelay = 0;
break;
case 'h':
usage(true);
return 0;
@@ -370,6 +373,7 @@ int main(int argc, char **argv)
pprint(2, " %d", cores[n]);
pprint(2, "\n");
}
pprint2("number of connections: %d\n", m.nr_threads);
/* create control session */
m.host = find_hostname(optind, argc, argv);
@@ -378,15 +382,14 @@ int main(int argc, char **argv)
return 1;
}
pprint3("connecting to %s for checking destinations...\n", m.host);
m.ctrl = ssh_make_sftp_session(m.host, &opts);
if (!m.ctrl)
ctrl = ssh_init_sftp_session(m.host, &opts);
if (!ctrl)
return 1;
m.opts = &opts; /* save ssh-able ssh_opts */
/* fill file list */
ret = file_fill(m.ctrl, &m.file_list, &argv[optind], argc - optind - 1,
m.target);
ret = file_fill(ctrl, &m.file_list, &argv[optind], argc - optind - 1, m.target);
if (ret < 0)
goto out;
@@ -396,7 +399,7 @@ int main(int argc, char **argv)
/* fill chunk list */
ret = chunk_fill(&m.file_list, &m.chunk_list,
nr_threads, min_chunk_sz, max_chunk_sz);
m.nr_threads, min_chunk_sz, max_chunk_sz);
if (ret < 0)
goto out;
@@ -404,56 +407,50 @@ int main(int argc, char **argv)
chunk_dump(&m.chunk_list);
#endif
if (dryrun)
if (dryrun) {
ssh_sftp_close(ctrl);
return 0;
/* prepare thread instances */
if ((n = list_count(&m.chunk_list)) < nr_threads) {
pprint3("we have only %d chunk(s). set nr_conns to %d\n", n, n);
nr_threads = n;
}
threads = calloc(nr_threads, sizeof(struct mscp_thread));
memset(threads, 0, nr_threads * sizeof(struct mscp_thread));
for (n = 0; n < nr_threads; n++) {
struct mscp_thread *t = &threads[n];
t->mscp = &m;
/* prepare thread instances */
if ((n = list_count(&m.chunk_list)) < m.nr_threads) {
pprint2("we have only %d chunk(s). "
"set number of connections to %d\n", n, n);
m.nr_threads = n;
}
m.threads = calloc(m.nr_threads, sizeof(struct mscp_thread));
memset(m.threads, 0, m.nr_threads * sizeof(struct mscp_thread));
for (n = 0; n < m.nr_threads; n++) {
struct mscp_thread *t = &m.threads[n];
t->finished = false;
if (!coremask)
t->cpu = -1;
else
t->cpu = cores[n % nr_cores];
pprint3("connecting to %s for a copy thread...\n", m.host);
t->sftp = ssh_make_sftp_session(m.host, m.opts);
if (n == 0) {
t->sftp = ctrl; /* reuse ctrl sftp session */
ctrl = NULL;
} else {
pprint3("connecting to %s for a copy thread...\n", m.host);
t->sftp = ssh_init_sftp_session(m.host, m.opts);
}
if (!t->sftp) {
ret = 1;
goto join_out;
goto out;
}
}
/* spawn count thread */
ret = pthread_create(&mtid, NULL, mscp_monitor_thread, &m);
if (ret < 0) {
pr_err("pthread_create error: %d\n", ret);
stop_copy_threads(0);
ret = 1;
goto join_out;
}
/* register SIGINT to stop threads */
if (signal(SIGINT, stop_copy_threads) == SIG_ERR) {
pr_err("cannot set signal: %s\n", strerrno());
/* init mscp stat for printing progress bar */
if (mscp_stat_init() < 0) {
ret = 1;
goto out;
}
/* save start time */
gettimeofday(&m.start, NULL);
/* spawn threads */
for (n = 0; n < nr_threads; n++) {
struct mscp_thread *t = &threads[n];
/* spawn copy threads */
for (n = 0; n < m.nr_threads; n++) {
struct mscp_thread *t = &m.threads[n];
ret = pthread_create(&t->tid, NULL, mscp_copy_thread, t);
if (ret < 0) {
pr_err("pthread_create error: %d\n", ret);
@@ -463,23 +460,37 @@ int main(int argc, char **argv)
}
}
join_out:
/* waiting for threads join... */
for (n = 0; n < nr_threads; n++)
if (threads[n].tid) {
pthread_join(threads[n].tid, NULL);
if (threads[n].ret < 0)
ret = threads[n].ret;
}
if (mtid != 0) {
pthread_cancel(mtid);
pthread_join(mtid, NULL);
/* register SIGINT to stop threads */
if (signal(SIGINT, stop_copy_threads) == SIG_ERR) {
pr_err("cannot set signal: %s\n", strerrno());
ret = 1;
goto out;
}
join_out:
/* waiting for threads join... */
for (n = 0; n < m.nr_threads; n++) {
if (m.threads[n].tid) {
pthread_join(m.threads[n].tid, NULL);
if (m.threads[n].ret < 0)
ret = m.threads[n].ret;
}
}
/* print final result */
mscp_stat_final();
out:
if (m.ctrl)
ssh_sftp_close(m.ctrl);
if (ctrl)
ssh_sftp_close(ctrl);
if (m.threads) {
for (n = 0; n < m.nr_threads; n++) {
struct mscp_thread *t = &m.threads[n];
if (t->sftp)
ssh_sftp_close(t->sftp);
}
}
return ret;
}
@@ -487,20 +498,12 @@ out:
void mscp_copy_thread_cleanup(void *arg)
{
struct mscp_thread *t = arg;
if (t->sftp) {
/* XXX: sftp_free --> ssh_poll sometimes blocked with
* no responses. So wet nonblocking. */
ssh_set_blocking(sftp_ssh(t->sftp), 1);
ssh_sftp_close(t->sftp);
}
t->finished = true;
__sync_synchronize();
}
void *mscp_copy_thread(void *arg)
{
struct mscp_thread *t = arg;
struct mscp *m = t->mscp;
sftp_session sftp = t->sftp;
struct chunk *c;
@@ -514,9 +517,9 @@ void *mscp_copy_thread(void *arg)
pthread_cleanup_push(mscp_copy_thread_cleanup, t);
while (1) {
lock_acquire(&m->chunk_lock);
c = chunk_acquire(&m->chunk_list);
lock_release(&m->chunk_lock);
LOCK_ACQUIRE_THREAD(&m.chunk_lock);
c = chunk_acquire(&m.chunk_list);
LOCK_RELEASE_THREAD();
if (!c)
break; /* no more chunks */
@@ -524,8 +527,7 @@ void *mscp_copy_thread(void *arg)
if ((t->ret = chunk_prepare(c, sftp)) < 0)
break;
if ((t->ret = chunk_copy(c, sftp, m->sftp_buf_sz, m->io_buf_sz,
m->nr_ahead, &t->done)) < 0)
if ((t->ret = chunk_copy(c, sftp, m.nr_ahead, m.buf_sz, &t->done)) < 0)
break;
}
@@ -538,7 +540,10 @@ void *mscp_copy_thread(void *arg)
return NULL;
}
static double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
/* progress bar-related functions */
double calculate_timedelta(struct timeval *b, struct timeval *a)
{
double sec, usec;
@@ -551,27 +556,48 @@ static double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
usec = a->tv_usec - b->tv_usec;
sec += usec / 1000000;
return (double)diff / sec;
return sec;
}
static void print_progress_bar(double percent, char *suffix)
double calculate_bps(size_t diff, struct timeval *b, struct timeval *a)
{
return (double)diff / calculate_timedelta(b, a);
}
char *calculate_eta(size_t remain, size_t diff, struct timeval *b, struct timeval *a)
{
static char buf[16];
double elapsed = calculate_timedelta(b, a);
double eta;
if (diff == 0)
snprintf(buf, sizeof(buf), "--:-- ETA");
else {
eta = remain / (diff / elapsed);
snprintf(buf, sizeof(buf), "%02d:%02d ETA",
(int)floor(eta / 60), (int)round(eta) % 60);
}
return buf;
}
void print_progress_bar(double percent, char *suffix)
{
int n, thresh, bar_width;
struct winsize ws;
char buf[128];
/*
* [=======> ] XX.X% SUFFIX
* [=======> ] XX% SUFFIX
*/
buf[0] = '\0';
if (ioctl(STDOUT_FILENO, TIOCGWINSZ, &ws) < 0)
return; /* XXX */
bar_width = min(sizeof(buf), ws.ws_col) - strlen(suffix) - 8;
bar_width = min(sizeof(buf), ws.ws_col) - strlen(suffix) - 7;
memset(buf, 0, sizeof(buf));
if (bar_width > 8) {
memset(buf, 0, sizeof(buf));
thresh = floor(bar_width * (percent / 100)) - 1;
for (n = 1; n < bar_width - 1; n++) {
@@ -590,8 +616,8 @@ static void print_progress_bar(double percent, char *suffix)
pprint1("%s%s", buf, suffix);
}
static void print_progress(struct timeval *start, struct timeval *end,
size_t total, size_t last, size_t done)
void print_progress(struct timeval *b, struct timeval *a,
size_t total, size_t last, size_t done)
{
char *bps_units[] = { "B/s ", "KB/s", "MB/s", "GB/s" };
char *byte_units[] = { "B ", "KB", "MB", "GB", "TB", "PB" };
@@ -613,7 +639,7 @@ static void print_progress(struct timeval *start, struct timeval *end,
byte_tu++)
total_round /= 1024;
bps = calculate_bps(done - last, start, end);
bps = calculate_bps(done - last, b, a);
for (bps_u = 0; bps > 1000 && bps_u < array_size(bps_units); bps_u++)
bps /= 1000;
@@ -624,83 +650,63 @@ static void print_progress(struct timeval *start, struct timeval *end,
byte_du++)
done_round /= 1024;
snprintf(suffix, sizeof(suffix), "%lu%s/%lu%s %6.1f%s ",
snprintf(suffix, sizeof(suffix), "%4lu%s/%lu%s %6.1f%s %s",
done_round, byte_units[byte_du], total_round, byte_units[byte_tu],
bps, bps_units[bps_u]);
bps, bps_units[bps_u], calculate_eta(total - done, done - last, b, a));
print_progress_bar(percent, suffix);
}
void mscp_monitor_thread_cleanup(void *arg)
struct mscp_stat {
struct timeval start, before, after;
size_t total;
size_t last;
size_t done;
} s;
void mscp_stat_handler(int signum)
{
struct mscp *m = arg;
struct timeval end;
struct file *f;
size_t total, done;
int n;
total = done = 0;
for (s.done = 0, n = 0; n < m.nr_threads; n++)
s.done += m.threads[n].done;
gettimeofday(&end, NULL);
/* get total byte to be transferred */
list_for_each_entry(f, &m->file_list, list) {
total += f->size;
gettimeofday(&s.after, NULL);
if (signum == SIGALRM) {
alarm(1);
print_progress(&s.before, &s.after, s.total, s.last, s.done);
s.before = s.after;
s.last = s.done;
} else {
/* called from mscp_stat_final. calculate progress from the beginning */
print_progress(&s.start, &s.after, s.total, 0, s.done);
}
/* get total byte transferred */
for (n = 0; n < nr_threads; n++) {
done += threads[n].done;
}
print_progress(&m->start, &end, total, 0, done);
fputs("\n", stdout); /* the final ouput. we need \n */
}
void *mscp_monitor_thread(void *arg)
int mscp_stat_init()
{
struct mscp *m = arg;
struct timeval a, b;
struct file *f;
bool all_done;
size_t total, done, last;
int n;
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
pthread_cleanup_push(mscp_monitor_thread_cleanup, m);
/* get total byte to be transferred */
total = 0;
list_for_each_entry(f, &m->file_list, list) {
total += f->size;
memset(&s, 0, sizeof(s));
list_for_each_entry(f, &m.file_list, list) {
s.total += f->size;
}
while (1) {
all_done = true;
last = done = 0;
for (n = 0; n < nr_threads; n++) {
last += threads[n].done;
}
gettimeofday(&b, NULL);
usleep(1000000);
for (n = 0; n < nr_threads; n++) {
done += threads[n].done;
if (!threads[n].finished)
all_done = false;
}
gettimeofday(&a, NULL);
print_progress(&b, &a, total, last, done);
if (all_done || total == done)
break;
if (signal(SIGALRM, mscp_stat_handler) == SIG_ERR) {
pr_err("signal: %s\n", strerrno());
return -1;
}
pthread_cleanup_pop(1);
gettimeofday(&s.start, NULL);
s.before = s.start;
alarm(1);
return NULL;
return 0;
}
void mscp_stat_final()
{
alarm(0);
mscp_stat_handler(0);
}

127
src/ssh.c
View File

@@ -1,6 +1,9 @@
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include "libssh/callbacks.h"
#include <ssh.h>
#include <util.h>
@@ -32,11 +35,23 @@ static int ssh_set_opts(ssh_session ssh, struct ssh_opts *opts)
if (opts->cipher) {
if (ssh_options_set(ssh, SSH_OPTIONS_CIPHERS_C_S, opts->cipher) < 0) {
pr_err("failed to set cipher client to server\n");
pr_err("failed to set cipher for client to server\n");
return -1;
}
if (ssh_options_set(ssh, SSH_OPTIONS_CIPHERS_S_C, opts->cipher) < 0) {
pr_err("failed to set cipher client to server\n");
pr_err("failed to set cipher for server to client\n");
return -1;
}
}
if (opts->hmac) {
pr_warn("%s\n", opts->hmac);
if (ssh_options_set(ssh, SSH_OPTIONS_HMAC_C_S, opts->hmac) < 0) {
pr_err("failed to set hmac for client to server\n");
return -1;
}
if (ssh_options_set(ssh, SSH_OPTIONS_HMAC_S_C, opts->hmac) < 0) {
pr_err("failed to set hmac for server to client\n");
return -1;
}
}
@@ -47,6 +62,12 @@ static int ssh_set_opts(ssh_session ssh, struct ssh_opts *opts)
return -1;
}
if (opts->nodelay &&
ssh_options_set(ssh, SSH_OPTIONS_NODELAY, &opts->nodelay) < 0) {
pr_err("failed to set nodelay\n");
return -1;
}
return 0;
}
@@ -63,31 +84,79 @@ static int ssh_authenticate(ssh_session ssh, struct ssh_opts *opts)
auth_bit_mask = ssh_userauth_list(ssh, NULL);
if (auth_bit_mask & SSH_AUTH_METHOD_NONE &&
ssh_userauth_none(ssh, NULL) == SSH_AUTH_SUCCESS) {
ssh_userauth_none(ssh, NULL) == SSH_AUTH_SUCCESS)
return 0;
}
if (auth_bit_mask & SSH_AUTH_METHOD_PUBLICKEY &&
ssh_userauth_publickey_auto(ssh, NULL, NULL) == SSH_AUTH_SUCCESS) {
ssh_userauth_publickey_auto(ssh, NULL, opts->passphrase) == SSH_AUTH_SUCCESS)
return 0;
}
if (auth_bit_mask & SSH_AUTH_METHOD_PASSWORD) {
if (!opts->password) {
opts->password = getpass("Password: ");
opts->password = malloc(PASSWORD_BUF_SZ);
if (!opts->password) {
pr_err("malloc: %s\n", strerrno());
return -1;
}
memset(opts->password, 0, PASSWORD_BUF_SZ);
if (ssh_getpass("Password: ", opts->password, PASSWORD_BUF_SZ,
0, 0) < 0) {
return -1;
}
}
if (ssh_userauth_password(ssh, NULL, opts->password) == SSH_AUTH_SUCCESS)
return 0;
}
pr_err("authentication failure: %s\n", ssh_get_error(ssh));
return -1;
}
static ssh_session ssh_make_ssh_session(char *sshdst, struct ssh_opts *opts)
static int ssh_cache_passphrase(const char *prompt, char *buf, size_t len, int echo,
int verify, void *userdata)
{
struct ssh_opts *opts = userdata;
/* This function is called on the first time for importing
* priv key file with passphrase. It is not called on the
* second time or after because cached passphrase is passed
* to ssh_userauth_publickey_auto(). */
if (opts->passphrase) {
/* passphrase is cached, but this function is called.
* maybe it was an invalid passphrase? */
free(opts->passphrase);
opts->passphrase = NULL;
}
if (ssh_getpass("Passphrase: ", buf, len, echo, verify) < 0)
return -1;
/* cache the passphrase */
opts->passphrase = malloc(len);
if (!opts->passphrase) {
pr_err("malloc: %s\n", strerrno());
return -1;
}
memcpy(opts->passphrase, buf, len);
return 0;
}
static struct ssh_callbacks_struct cb = {
.auth_function = ssh_cache_passphrase,
.userdata = NULL,
};
static ssh_session ssh_init_session(char *sshdst, struct ssh_opts *opts)
{
ssh_session ssh = ssh_new();
ssh_callbacks_init(&cb);
cb.userdata = opts;
ssh_set_callbacks(ssh, &cb);
if (ssh_set_opts(ssh, opts) != 0)
goto free_out;
@@ -119,10 +188,10 @@ free_out:
return NULL;
}
sftp_session ssh_make_sftp_session(char *sshdst, struct ssh_opts *opts)
sftp_session ssh_init_sftp_session(char *sshdst, struct ssh_opts *opts)
{
sftp_session sftp;
ssh_session ssh = ssh_make_ssh_session(sshdst, opts);
ssh_session ssh = ssh_init_session(sshdst, opts);
if (!ssh) {
return NULL;
@@ -205,7 +274,7 @@ static int ssh_verify_known_hosts(ssh_session session)
case SSH_KNOWN_HOSTS_UNKNOWN:
hexa = ssh_get_hexa(hash, hlen);
fprintf(stderr,"The server is unknown. Do you trust the host key?\n");
fprintf(stderr, "The server is unknown. Do you trust the host key?\n");
fprintf(stderr, "Public key hash: %s\n", hexa);
fprintf(stderr, "(yes/no): ");
ssh_string_free_char(hexa);
@@ -240,36 +309,10 @@ static int ssh_verify_known_hosts(ssh_session session)
void ssh_sftp_close(sftp_session sftp)
{
ssh_session ssh = sftp_ssh(sftp);
sftp_free(sftp);
/* XXX: sftp_free is stuck in ssh_poll_ctx_dopoll() when build type is Release.
* skip sftp_free inappropriately...
*/
//sftp_free(sftp);
ssh_disconnect(ssh);
ssh_free(ssh);
}
ssize_t sftp_write2(sftp_file sf, const void *buf, size_t len, size_t sftp_buf_sz)
{
ssize_t ret, nbytes;
for (nbytes = 0; nbytes < len;) {
ret = sftp_write(sf, buf + nbytes,
min(len - nbytes, sftp_buf_sz));
if (ret < 0)
return ret;
nbytes += ret;
}
return nbytes;
}
ssize_t sftp_read2(sftp_file sf, void *buf, size_t len, size_t sftp_buf_sz)
{
ssize_t ret, nbytes;
for (nbytes = 0; nbytes < len;) {
ret = sftp_read(sf, buf + nbytes,
min(len - nbytes, sftp_buf_sz));
if (ret < 0)
return ret;
nbytes += ret;
}
return nbytes;
}

View File

@@ -11,24 +11,24 @@ struct ssh_opts {
char *port; /* -p */
char *identity; /* -i */
char *cipher; /* -c */
char *hmac; /* -M */
int compress; /* -C */
int nodelay; /* -N */
int debuglevel; /* -v */
bool no_hostkey_check; /* -H */
char *password; /* filled at the first connecting phase */
#define PASSWORD_BUF_SZ 128
char *password; /* password for password auth */
char *passphrase; /* passphrase for private key */
};
/* ssh_make_sftp_session() creates sftp_session. sshdst accpets
/* ssh_init_sftp_session() creates sftp_session. sshdst accpets
* user@hostname and hostname notations (by libssh).
*/
sftp_session ssh_make_sftp_session(char *sshdst, struct ssh_opts *opts);
sftp_session ssh_init_sftp_session(char *sshdst, struct ssh_opts *opts);
void ssh_sftp_close(sftp_session sftp);
#define sftp_ssh(sftp) (sftp)->session
#define sftp_get_ssh_error(sftp) ssh_get_error(sftp_ssh(sftp))
/* wrapping multiple sftp_read|write */
ssize_t sftp_write2(sftp_file sf, const void *buf, size_t len, size_t sftp_buf_sz);
ssize_t sftp_read2(sftp_file sf, void *buf, size_t len, size_t sftp_buf_sz);
#endif /* _SSH_H_ */