diff --git a/crt/aws-c-s3 b/crt/aws-c-s3 index 169842b7e..f20a0634e 160000 --- a/crt/aws-c-s3 +++ b/crt/aws-c-s3 @@ -1 +1 @@ -Subproject commit 169842b7e2f81d71d0719d4a77f9c3e186512f99 +Subproject commit f20a0634e65c907a7a024f7107d22dac63841235 diff --git a/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java b/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java index 376e01298..5bd654828 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java @@ -63,6 +63,7 @@ public ResumeToken build() { private long totalNumParts; private long numPartsCompleted; private String uploadId; + private String objectLastModified; public ResumeToken(PutResumeTokenBuilder builder) { this.nativeType = S3MetaRequestOptions.MetaRequestType.PUT_OBJECT.getNativeValue(); @@ -122,4 +123,17 @@ public String getUploadId() { return uploadId; } + /****** + * Download Specific fields. + ******/ + /** + * @return Object last modified time + */ + public String getObjectLastModifiedString() { + if (getType() != S3MetaRequestOptions.MetaRequestType.GET_OBJECT) { + throw new IllegalArgumentException("ResumeToken - Object last modified time is only defined for Get Object Resume tokens"); + } + + return objectLastModified; + } } diff --git a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequest.java b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequest.java index 7824bac78..596685ce2 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequest.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequest.java @@ -6,6 +6,7 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.CrtRuntimeException; public class S3MetaRequest extends CrtResource { @@ -75,6 +76,20 @@ public ResumeToken pause() { return s3MetaRequestPause(getNativeHandle()); } + public CompletableFuture pauseAsync() { + CompletableFuture future = new CompletableFuture<>(); + if (isNull()) { + throw new IllegalStateException("S3MetaRequest has been closed."); + } + try { + s3MetaRequestPauseAsync(getNativeHandle(), future); + } catch (Exception e) { + future.completeExceptionally(e); + } + + return future; + } + /** * Increment the flow-control window, so that response data continues downloading. *

@@ -114,5 +129,9 @@ public void incrementReadWindow(long bytes) { private static native ResumeToken s3MetaRequestPause(long s3MetaRequest); + private static native void s3MetaRequestPauseAsync( + long s3MetaRequest, + CompletableFuture future) throws CrtRuntimeException; + private static native void s3MetaRequestIncrementReadWindow(long s3MetaRequest, long bytes); } diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index ffa0041f0..621cdf526 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -988,6 +988,9 @@ static void s_cache_s3_meta_request_resume_token(JNIEnv *env) { s3_meta_request_resume_token_properties.upload_id_field_id = (*env)->GetFieldID(env, cls, "uploadId", "Ljava/lang/String;"); AWS_FATAL_ASSERT(s3_meta_request_resume_token_properties.upload_id_field_id); + s3_meta_request_resume_token_properties.object_last_modified_field_id = + (*env)->GetFieldID(env, cls, "objectLastModified", "Ljava/lang/String;"); + AWS_FATAL_ASSERT(s3_meta_request_resume_token_properties.object_last_modified_field_id); } struct java_aws_mqtt5_connack_packet_properties mqtt5_connack_packet_properties; diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index 304c86fe1..d93db1f5e 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -455,6 +455,7 @@ struct java_aws_s3_meta_request_resume_token { jfieldID total_num_parts_field_id; jfieldID num_parts_completed_field_id; jfieldID upload_id_field_id; + jfieldID object_last_modified_field_id; }; extern struct java_aws_s3_meta_request_resume_token s3_meta_request_resume_token_properties; diff --git a/src/native/s3_client.c b/src/native/s3_client.c index 7754387b3..1b96a5e8a 100644 --- a/src/native/s3_client.c +++ b/src/native/s3_client.c @@ -1210,28 +1210,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRe aws_s3_meta_request_cancel(meta_request); } -JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestPause( - JNIEnv *env, - jclass jni_class, - jlong jni_s3_meta_request) { - - (void)jni_class; - aws_cache_jni_ids(env); - - struct aws_s3_meta_request *meta_request = (struct aws_s3_meta_request *)jni_s3_meta_request; - if (!meta_request) { - aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); - aws_jni_throw_illegal_argument_exception(env, "S3MetaRequest.s3MetaRequestPause: Invalid/null meta request"); - return NULL; - } - - struct aws_s3_meta_request_resume_token *resume_token = NULL; - - if (aws_s3_meta_request_pause(meta_request, &resume_token)) { - aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to pause request"); - return NULL; - } - +static jobject s_java_resume_token_from_native(JNIEnv *env, struct aws_s3_meta_request_resume_token *resume_token) { jobject resume_token_jni = NULL; if (resume_token != NULL) { resume_token_jni = (*env)->NewObject( @@ -1240,15 +1219,10 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3Met s3_meta_request_resume_token_properties.s3_meta_request_resume_token_constructor_method_id); if ((*env)->ExceptionCheck(env) || resume_token_jni == NULL) { aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to create ResumeToken."); - goto on_done; + goto done; } enum aws_s3_meta_request_type type = aws_s3_meta_request_resume_token_type(resume_token); - if (type != AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) { - aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to convert resume token."); - goto on_done; - } - (*env)->SetIntField(env, resume_token_jni, s3_meta_request_resume_token_properties.native_type_field_id, type); (*env)->SetLongField( env, @@ -1267,17 +1241,147 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3Met aws_s3_meta_request_resume_token_num_parts_completed(resume_token)); struct aws_byte_cursor upload_id_cur = aws_s3_meta_request_resume_token_upload_id(resume_token); - jstring upload_id_jni = aws_jni_string_from_cursor(env, &upload_id_cur); - (*env)->SetObjectField( - env, resume_token_jni, s3_meta_request_resume_token_properties.upload_id_field_id, upload_id_jni); + if (upload_id_cur.len > 0) { + jstring upload_id_jni = aws_jni_string_from_cursor(env, &upload_id_cur); + (*env)->SetObjectField( + env, resume_token_jni, s3_meta_request_resume_token_properties.upload_id_field_id, upload_id_jni); + + (*env)->DeleteLocalRef(env, upload_id_jni); + } + struct aws_byte_cursor object_last_modified_cur = aws_s3_meta_request_resume_object_last_modified(resume_token); + if (object_last_modified_cur.len > 0) { + jstring object_last_modified = aws_jni_string_from_cursor(env, &object_last_modified_cur); + (*env)->SetObjectField( + env, + resume_token_jni, + s3_meta_request_resume_token_properties.object_last_modified_field_id, + object_last_modified); + + (*env)->DeleteLocalRef(env, object_last_modified); + } + printf("Resume token: %p\n", (void *)resume_token); + printf("Java resume token: %p\n", (void *)resume_token_jni); + printf("Resume token type: %d\n", type); + printf( + "Resume token part size: %llu\n", + (unsigned long long)aws_s3_meta_request_resume_token_part_size(resume_token)); + printf( + "Resume token total num parts: %llu\n", + (unsigned long long)aws_s3_meta_request_resume_token_total_num_parts(resume_token)); + printf( + "Resume token num parts completed: %llu\n", + (unsigned long long)aws_s3_meta_request_resume_token_num_parts_completed(resume_token)); + } +done: + return resume_token_jni; +} + +JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestPause( + JNIEnv *env, + jclass jni_class, + jlong jni_s3_meta_request) { + + (void)jni_class; + aws_cache_jni_ids(env); - (*env)->DeleteLocalRef(env, upload_id_jni); + struct aws_s3_meta_request *meta_request = (struct aws_s3_meta_request *)jni_s3_meta_request; + if (!meta_request) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + aws_jni_throw_illegal_argument_exception(env, "S3MetaRequest.s3MetaRequestPause: Invalid/null meta request"); + return NULL; + } + + struct aws_s3_meta_request_resume_token *resume_token = NULL; + + if (aws_s3_meta_request_pause(meta_request, &resume_token)) { + aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to pause request"); + return NULL; } -on_done: + jobject resume_token_jni = s_java_resume_token_from_native(env, resume_token); aws_s3_meta_request_resume_token_release(resume_token); return resume_token_jni; } +struct s_pause_callback_data { + JavaVM *jvm; + jobject java_pause_future; +}; + +static void s_s3_meta_request_pause_complete( + struct aws_s3_meta_request *meta_request, + struct aws_s3_meta_request_resume_token *resume_token, + void *user_data) { + (void)meta_request; + struct s_pause_callback_data *callback_data = (struct s_pause_callback_data *)user_data; + /********** JNI ENV ACQUIRE **********/ + JNIEnv *env = aws_jni_acquire_thread_env(callback_data->jvm); + JavaVM *jvm = callback_data->jvm; + if (env == NULL) { + /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */ + return; + } + + jobject resume_token_jni = s_java_resume_token_from_native(env, resume_token); + if (resume_token_jni == NULL) { + jobject exception; + if (!aws_jni_get_and_clear_exception(env, &exception)) { + /* No exception raised from Java, create our own */ + exception = aws_jni_new_crt_exception_from_error_code(env, aws_last_error()); + } + + (*env)->CallBooleanMethod( + env, + callback_data->java_pause_future, + completable_future_properties.complete_exceptionally_method_id, + exception); + + goto done; + } + + (*env)->CallBooleanMethod( + env, callback_data->java_pause_future, completable_future_properties.complete_method_id, resume_token_jni); + (*env)->DeleteLocalRef(env, resume_token_jni); + +done: + (*env)->DeleteGlobalRef(env, callback_data->java_pause_future); + aws_mem_release(aws_jni_get_allocator(), callback_data); + aws_jni_release_thread_env(jvm, env); + /********** JNI ENV RELEASE **********/ +} + +JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestPauseAsync( + JNIEnv *env, + jclass jni_class, + jlong jni_s3_meta_request, + jobject java_pause_future) { + + (void)jni_class; + aws_cache_jni_ids(env); + + struct aws_s3_meta_request *meta_request = (struct aws_s3_meta_request *)jni_s3_meta_request; + if (!meta_request) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + aws_jni_throw_illegal_argument_exception( + env, "S3MetaRequest.s3MetaRequestPauseAsync: Invalid/null meta request"); + return; + } + + struct aws_allocator *allocator = aws_jni_get_allocator(); + struct s_pause_callback_data *callback_data = aws_mem_calloc(allocator, 1, sizeof(struct s_pause_callback_data)); + jint jvmresult = (*env)->GetJavaVM(env, &callback_data->jvm); + AWS_FATAL_ASSERT(jvmresult == 0); + + callback_data->java_pause_future = (*env)->NewGlobalRef(env, java_pause_future); + AWS_FATAL_ASSERT(callback_data->java_pause_future != NULL); + + if (aws_s3_meta_request_pause_async(meta_request, s_s3_meta_request_pause_complete, callback_data)) { + printf("Failed to pause async, last error: %d\n", aws_last_error()); + (*env)->DeleteGlobalRef(env, callback_data->java_pause_future); + aws_mem_release(allocator, callback_data); + aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPauseAsync: Failed to pause request"); + return; + } +} JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestIncrementReadWindow( JNIEnv *env, diff --git a/src/test/java/software/amazon/awssdk/crt/test/S3ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/S3ClientTest.java index da80c5c75..6156fde9a 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/S3ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/S3ClientTest.java @@ -337,14 +337,24 @@ public void onFinished(S3FinishedResponseContext context) { }; HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; - HttpRequest httpRequest = new HttpRequest("GET", PRE_EXIST_1MB_PATH, headers, null); + HttpRequest httpRequest = new HttpRequest("GET", "/pre-existing-2GB", headers, null); S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) .withResponseHandler(responseHandler); try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { - Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); + Thread.sleep(3000); // sleep 1 sec to pause + CompletableFuture pauseFuture = metaRequest.pauseAsync(); + Assert.assertNotNull(pauseFuture); + ResumeToken token = pauseFuture.get(); + // ResumeToken token = metaRequest.pause(); + Assert.assertNotNull(token); + System.err.println("Resume token getObjectLastModifiedString from Java: " + token.getObjectLastModifiedString()); + System.err.println("Resume token part size from Java: " + token.getPartSize()); + System.err.println("Resume token getNumPartsCompleted from Java: " + token.getNumPartsCompleted()); + System.err.println("Resume token getTotalNumParts from Java: " + token.getTotalNumParts()); + // Assert.assertEquals(Integer.valueOf(14352), onFinishedFuture.get()); } } catch (InterruptedException | ExecutionException ex) { Assert.fail(ex.getMessage());