add ssh_buffer_new_size and ssh_buffer_add_func to libssh

sftp_async_write() with these functions reduces
  1. realloc_buffer by ssh_buffer_new_size()
  2. memcpy from read data to ssh buffer by ssh_buffer_add_func()
This commit is contained in:
Ryo Nakamura
2022-12-06 15:02:14 +09:00
parent 289293e812
commit c4ea9a1e78
2 changed files with 181 additions and 36 deletions

View File

@@ -43,15 +43,50 @@ index 068db988..5fc3c8fc 100644
+if (WITH_STATIC_LIB) +if (WITH_STATIC_LIB)
+ set(BUILD_STATIC_LIB ON) + set(BUILD_STATIC_LIB ON)
+endif() +endif()
diff --git a/include/libssh/buffer.h b/include/libssh/buffer.h
index a55a1b40..e34e075c 100644
--- a/include/libssh/buffer.h
+++ b/include/libssh/buffer.h
@@ -33,6 +33,8 @@ int ssh_buffer_add_u8(ssh_buffer buffer, uint8_t data);
int ssh_buffer_add_u16(ssh_buffer buffer, uint16_t data);
int ssh_buffer_add_u32(ssh_buffer buffer, uint32_t data);
int ssh_buffer_add_u64(ssh_buffer buffer, uint64_t data);
+ssize_t ssh_buffer_add_func(ssh_buffer buffer, ssh_add_func f, size_t max_bytes,
+ void *userdata);
int ssh_buffer_validate_length(struct ssh_buffer_struct *buffer, size_t len);
diff --git a/include/libssh/libssh.h b/include/libssh/libssh.h
index 7857a77b..403a1585 100644
--- a/include/libssh/libssh.h
+++ b/include/libssh/libssh.h
@@ -833,6 +833,7 @@ LIBSSH_API const char* ssh_get_hmac_in(ssh_session session);
LIBSSH_API const char* ssh_get_hmac_out(ssh_session session);
LIBSSH_API ssh_buffer ssh_buffer_new(void);
+LIBSSH_API ssh_buffer ssh_buffer_new_size(uint32_t size);
LIBSSH_API void ssh_buffer_free(ssh_buffer buffer);
#define SSH_BUFFER_FREE(x) \
do { if ((x) != NULL) { ssh_buffer_free(x); x = NULL; } } while(0)
@@ -843,6 +844,8 @@ LIBSSH_API void *ssh_buffer_get(ssh_buffer buffer);
LIBSSH_API uint32_t ssh_buffer_get_len(ssh_buffer buffer);
LIBSSH_API int ssh_session_set_disconnect_message(ssh_session session, const char *message);
+typedef ssize_t (*ssh_add_func) (void *ptr, size_t max_bytes, void *userdata);
+
#ifndef LIBSSH_LEGACY_0_4
#include "libssh/legacy.h"
#endif
diff --git a/include/libssh/sftp.h b/include/libssh/sftp.h diff --git a/include/libssh/sftp.h b/include/libssh/sftp.h
index c855df8a..1fd1710a 100644 index c855df8a..0fcdb9b8 100644
--- a/include/libssh/sftp.h --- a/include/libssh/sftp.h
+++ b/include/libssh/sftp.h +++ b/include/libssh/sftp.h
@@ -565,6 +565,9 @@ LIBSSH_API int sftp_async_read(sftp_file file, void *data, uint32_t len, uint32_ @@ -565,6 +565,10 @@ LIBSSH_API int sftp_async_read(sftp_file file, void *data, uint32_t len, uint32_
*/ */
LIBSSH_API ssize_t sftp_write(sftp_file file, const void *buf, size_t count); LIBSSH_API ssize_t sftp_write(sftp_file file, const void *buf, size_t count);
+LIBSSH_API int sftp_async_write(sftp_file file, const void *buf, size_t count, uint32_t* id); +LIBSSH_API ssize_t sftp_async_write(sftp_file file, ssh_add_func f, size_t count,
+ void *userdata, uint32_t* id);
+LIBSSH_API int sftp_async_write_end(sftp_file file, uint32_t id, int blocking); +LIBSSH_API int sftp_async_write_end(sftp_file file, uint32_t id, int blocking);
+ +
/** /**
@@ -73,26 +108,131 @@ index c090fef7..e2f86309 100644
endif (BUILD_STATIC_LIB) endif (BUILD_STATIC_LIB)
message(STATUS "Threads_FOUND=${Threads_FOUND}") message(STATUS "Threads_FOUND=${Threads_FOUND}")
diff --git a/src/buffer.c b/src/buffer.c
index e0068015..85e8dba1 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -141,6 +141,37 @@ struct ssh_buffer_struct *ssh_buffer_new(void)
return buf;
}
+/**
+ * @brief Create a new SSH buffer with specified size.
+ *
+ * @param[in] length for newly initialized SSH buffer.
+ * @return A newly initialized SSH buffer, NULL on error.
+ */
+struct ssh_buffer_struct *ssh_buffer_new_size(uint32_t len)
+{
+ struct ssh_buffer_struct *buf = NULL;
+ int rc;
+
+ buf = calloc(1, sizeof(struct ssh_buffer_struct));
+ if (buf == NULL) {
+ return NULL;
+ }
+
+ /*
+ * Always preallocate 64 bytes.
+ *
+ * -1 for realloc_buffer magic.
+ */
+ rc = ssh_buffer_allocate_size(buf, len);
+ if (rc != 0) {
+ SAFE_FREE(buf);
+ return NULL;
+ }
+ buffer_verify(buf);
+
+ return buf;
+}
+
/**
* @brief Deallocate a SSH buffer.
*
@@ -328,6 +359,49 @@ int ssh_buffer_add_data(struct ssh_buffer_struct *buffer, const void *data, uint
return 0;
}
+/**
+ * @brief Add data at the tail of a buffer by an external function
+ *
+ * @param[in] buffer The buffer to add data.
+ *
+ * @param[in] f function that adds data to the buffer.
+ *
+ * @param[in] max_bytes The maximum length of the data to add.
+ *
+ * @return actual bytes added on success, < 0 on error.
+ */
+ssize_t ssh_buffer_add_func(struct ssh_buffer_struct *buffer, ssh_add_func f,
+ size_t max_bytes, void *userdata)
+{
+ ssize_t actual;
+
+ if (buffer == NULL) {
+ return -1;
+ }
+
+ buffer_verify(buffer);
+
+ if (buffer->used + max_bytes < max_bytes) {
+ return -1;
+ }
+
+ if (buffer->allocated < (buffer->used + max_bytes)) {
+ if (buffer->pos > 0) {
+ buffer_shift(buffer);
+ }
+ if (realloc_buffer(buffer, buffer->used + max_bytes) < 0) {
+ return -1;
+ }
+ }
+
+ if ((actual = f(buffer->data + buffer->used, max_bytes, userdata)) < 0)
+ return -1;
+
+ buffer->used += actual;
+ buffer_verify(buffer);
+ return actual;
+}
+
/**
* @brief Ensure the buffer has at least a certain preallocated size.
*
diff --git a/src/sftp.c b/src/sftp.c diff --git a/src/sftp.c b/src/sftp.c
index e01012a8..7b5dc249 100644 index e01012a8..8e3a73c1 100644
--- a/src/sftp.c --- a/src/sftp.c
+++ b/src/sftp.c +++ b/src/sftp.c
@@ -2228,6 +2228,102 @@ ssize_t sftp_write(sftp_file file, const void *buf, size_t count) { @@ -2228,6 +2228,123 @@ ssize_t sftp_write(sftp_file file, const void *buf, size_t count) {
return -1; /* not reached */ return -1; /* not reached */
} }
+/* +/*
+ * sftp_async_write and sftp_async_write_end are copied from + * sftp_async_write is based on and sftp_async_write_end is copied from
+ * https://github.com/limes-datentechnik-gmbh/libssh + * https://github.com/limes-datentechnik-gmbh/libssh
+ *
+ * sftp_async_write has some optimizations:
+ * - use ssh_buffer_new_size() to reduce realoc_buffer.
+ * - use ssh_buffer_add_func() to avoid memcpy from read buffer to ssh buffer.
+ */ + */
+int sftp_async_write(sftp_file file, const void *buf, size_t count, uint32_t* id) { +ssize_t sftp_async_write(sftp_file file, ssh_add_func f, size_t count, void *userdata,
+ uint32_t* id) {
+ sftp_session sftp = file->sftp; + sftp_session sftp = file->sftp;
+ ssh_buffer buffer; + ssh_buffer buffer;
+ uint32_t buf_sz;
+ ssize_t actual;
+ int len; + int len;
+ int packetlen; + int packetlen;
+ int rc; + int rc;
+ +
+ buffer = ssh_buffer_new(); + buf_sz = (sizeof(uint32_t) + /* id */
+ ssh_string_len(file->handle) + 4 + /* file->handle */
+ sizeof(uint64_t) + /* file->offset */
+ sizeof(uint32_t) + /* count */
+ count); /* datastring */
+
+ buffer = ssh_buffer_new_size(buf_sz);
+ if (buffer == NULL) { + if (buffer == NULL) {
+ ssh_set_error_oom(sftp->session); + ssh_set_error_oom(sftp->session);
+ return -1; + return -1;
@@ -101,17 +241,25 @@ index e01012a8..7b5dc249 100644
+ *id = sftp_get_new_id(file->sftp); + *id = sftp_get_new_id(file->sftp);
+ +
+ rc = ssh_buffer_pack(buffer, + rc = ssh_buffer_pack(buffer,
+ "dSqdP", + "dSqd",
+ *id, + *id,
+ file->handle, + file->handle,
+ file->offset, + file->offset,
+ count, /* len of datastring */ + count); /* len of datastring */
+ (size_t)count, buf); +
+ if (rc != SSH_OK){ + if (rc != SSH_OK){
+ ssh_set_error_oom(sftp->session); + ssh_set_error_oom(sftp->session);
+ ssh_buffer_free(buffer); + ssh_buffer_free(buffer);
+ return SSH_ERROR; + return SSH_ERROR;
+ } + }
+
+ actual = ssh_buffer_add_func(buffer, f, count, userdata);
+ if (actual < 0){
+ ssh_set_error_oom(sftp->session);
+ ssh_buffer_free(buffer);
+ return SSH_ERROR;
+ }
+
+ packetlen=ssh_buffer_get_len(buffer)+5; + packetlen=ssh_buffer_get_len(buffer)+5;
+ len = sftp_packet_write(file->sftp, SSH_FXP_WRITE, buffer); + len = sftp_packet_write(file->sftp, SSH_FXP_WRITE, buffer);
+ ssh_buffer_free(buffer); + ssh_buffer_free(buffer);
@@ -125,9 +273,9 @@ index e01012a8..7b5dc249 100644
+ return SSH_ERROR; + return SSH_ERROR;
+ } + }
+ +
+ file->offset += count; + file->offset += actual;
+ +
+ return SSH_OK; + return actual;
+} +}
+ +
+int sftp_async_write_end(sftp_file file, uint32_t id, int blocking) { +int sftp_async_write_end(sftp_file file, uint32_t id, int blocking) {

View File

@@ -647,11 +647,16 @@ static sftp_file chunk_open_remote(const char *path, int flags, mode_t mode, siz
return sf; return sf;
} }
static ssize_t read_to_buf(void *ptr, size_t len, void *userdata)
{
int fd = *((int *)userdata);
return read(fd, ptr, len);
}
static int chunk_copy_local_to_remote_async(struct chunk *c, int fd, sftp_file sf, static int chunk_copy_local_to_remote_async(struct chunk *c, int fd, sftp_file sf,
int nr_ahead, int buf_sz, size_t *counter) int nr_ahead, int buf_sz, size_t *counter)
{ {
ssize_t read_bytes, remaind, thrown; ssize_t read_bytes, remaind, thrown;
char buf[buf_sz];
int idx, ret; int idx, ret;
struct { struct {
uint32_t id; uint32_t id;
@@ -663,15 +668,12 @@ static int chunk_copy_local_to_remote_async(struct chunk *c, int fd, sftp_file s
remaind = thrown = c->len; remaind = thrown = c->len;
for (idx = 0; idx < nr_ahead && thrown > 0; idx++) { for (idx = 0; idx < nr_ahead && thrown > 0; idx++) {
reqs[idx].len = min(thrown, sizeof(buf)); reqs[idx].len = min(thrown, buf_sz);
read_bytes = read(fd, buf, reqs[idx].len); reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
if (read_bytes < 0) { &reqs[idx].id);
pr_err("read: %s\n", strerrno()); if (reqs[idx].len < 0) {
return -1; pr_err("sftp_async_write: %d or %s\n",
} sftp_get_error(sf->sftp), strerrno());
ret = sftp_async_write(sf, buf, reqs[idx].len, &reqs[idx].id);
if (ret < 0) {
pr_err("sftp_async_write: %d\n", sftp_get_error(sf->sftp));
return -1; return -1;
} }
thrown -= reqs[idx].len; thrown -= reqs[idx].len;
@@ -693,25 +695,20 @@ static int chunk_copy_local_to_remote_async(struct chunk *c, int fd, sftp_file s
if (thrown <= 0) if (thrown <= 0)
continue; continue;
reqs[idx].len = min(thrown, sizeof(buf)); reqs[idx].len = min(thrown, buf_sz);
read_bytes = read(fd, buf, reqs[idx].len); reqs[idx].len = sftp_async_write(sf, read_to_buf, reqs[idx].len, &fd,
if (read_bytes < 0) { &reqs[idx].id);
pr_err("read: %s\n", strerrno()); if (reqs[idx].len < 0) {
return -1; pr_err("sftp_async_write: %d or %s\n",
} sftp_get_error(sf->sftp), strerrno());
ret = sftp_async_write(sf, buf, reqs[idx].len, &reqs[idx].id);
if (ret < 0) {
pr_err("sftp_async_write: %d\n", sftp_get_error(sf->sftp));
return -1; return -1;
} }
thrown -= reqs[idx].len; thrown -= reqs[idx].len;
} }
if (remaind < 0) { if (remaind < 0) {
pr_err("invalid remaind bytes %ld. last async_write_end bytes %lu. " pr_err("invalid remaind bytes %ld. last async_write_end bytes %lu.",
"last read bytes %ld\n", remaind, reqs[idx].len);
remaind, reqs[idx].len, read_bytes);
return -1; return -1;
} }