diff --git a/docs/config/ctl.rst b/docs/config/ctl.rst index b62602d5a..a020e1762 100644 --- a/docs/config/ctl.rst +++ b/docs/config/ctl.rst @@ -314,7 +314,25 @@ The CUDA provider currently exposes only the common statistics nodes. Level Zero memory provider (``LEVEL_ZERO``) ----------------------------------------------- -The Level Zero provider implements the same statistics nodes as the other providers. +The Level Zero provider supports the common statistics nodes described above and +adds the following parameter entry. + +.. py:function:: .params.use_import_export_for_IPC(policy) + + :param policy: Receives or supplies ``0`` to use IPC API for memory sharing + and ``1`` to use import/export mechanism for memory sharing. + :type policy: ``int`` + + **Access:** read-write. + **Defaults / Env:** Supported. + + Controls the memory exchange policy for inter-process communication + operations. When set to ``0`` (default), the provider uses the IPC API + for memory sharing between processes. When set to ``1``, the provider uses + the import/export mechanism for memory sharing. This option is supported + only on Windows with the Level Zero provider, where the default IPC mechanism + does not work. Note that enabling import/export adds overhead during + allocation and deallocation for all allocations on the current provider. Pool nodes ========== diff --git a/src/provider/provider_level_zero.c b/src/provider/provider_level_zero.c index b75c14094..7e9f32231 100644 --- a/src/provider/provider_level_zero.c +++ b/src/provider/provider_level_zero.c @@ -44,20 +44,26 @@ void fini_ze_global_state(void) { // Level Zero Memory Provider settings struct typedef struct umf_level_zero_memory_provider_params_t { - ze_context_handle_t - level_zero_context_handle; ///< Handle to the Level Zero context - ze_device_handle_t - level_zero_device_handle; ///< Handle to the Level Zero device + // Handle to the Level Zero context + ze_context_handle_t level_zero_context_handle; - umf_usm_memory_type_t memory_type; ///< Allocation memory type + // Handle to the Level Zero device + ze_device_handle_t level_zero_device_handle; - ze_device_handle_t * - resident_device_handles; ///< Array of devices for which the memory should be made resident - uint32_t - resident_device_count; ///< Number of devices for which the memory should be made resident + // Allocation memory type + umf_usm_memory_type_t memory_type; - umf_level_zero_memory_provider_free_policy_t - freePolicy; ///< Memory free policy + // Array of devices for which the memory should be made resident + ze_device_handle_t *resident_device_handles; + + // Number of devices for which the memory should be made resident + uint32_t resident_device_count; + + // Memory free policy + umf_level_zero_memory_provider_free_policy_t freePolicy; + + // Memory exchange policy 0 = IPC (default), 1 = import/export + int use_import_export_for_IPC; uint32_t device_ordinal; char name[64]; @@ -77,6 +83,9 @@ typedef struct ze_memory_provider_t { ze_driver_memory_free_policy_ext_flags_t freePolicyFlags; + // Memory exchange policy 0 = IPC (default), 1 = import/export + int use_import_export_for_IPC; + size_t min_page_size; uint32_t device_ordinal; @@ -134,7 +143,56 @@ static void store_last_native_error(int32_t native_error) { struct ctl ze_memory_ctl_root; static UTIL_ONCE_FLAG ctl_initialized = UTIL_ONCE_FLAG_INIT; +static ze_relaxed_allocation_limits_exp_desc_t relaxed_device_allocation_desc = + {.stype = ZE_STRUCTURE_TYPE_RELAXED_ALLOCATION_LIMITS_EXP_DESC, + .pNext = NULL, + .flags = ZE_RELAXED_ALLOCATION_LIMITS_EXP_FLAG_MAX_SIZE}; + +static ze_external_memory_export_desc_t memory_export_desc = { + .stype = ZE_STRUCTURE_TYPE_EXTERNAL_MEMORY_EXPORT_DESC, + .pNext = NULL, + .flags = ZE_EXTERNAL_MEMORY_TYPE_FLAG_OPAQUE_WIN32}; + +static umf_result_t CTL_READ_HANDLER(use_import_export_for_IPC)( + void *ctx, umf_ctl_query_source_t source, void *arg, size_t size, + umf_ctl_index_utlist_t *indexes) { + (void)source, (void)indexes; + + if (arg == NULL || size != sizeof(int)) { + LOG_ERR("arg is NULL or size is not valid"); + return UMF_RESULT_ERROR_INVALID_ARGUMENT; + } + + int *arg_out = arg; + ze_memory_provider_t *ze_provider = (ze_memory_provider_t *)ctx; + *arg_out = ze_provider->use_import_export_for_IPC; + return UMF_RESULT_SUCCESS; +} + +static umf_result_t CTL_WRITE_HANDLER(use_import_export_for_IPC)( + void *ctx, umf_ctl_query_source_t source, void *arg, size_t size, + umf_ctl_index_utlist_t *indexes) { + (void)source, (void)indexes; + + if (arg == NULL || size != sizeof(int)) { + LOG_ERR("arg is NULL or size is not valid"); + return UMF_RESULT_ERROR_INVALID_ARGUMENT; + } + + int arg_in = *(int *)arg; + ze_memory_provider_t *ze_provider = (ze_memory_provider_t *)ctx; + ze_provider->use_import_export_for_IPC = arg_in; + return UMF_RESULT_SUCCESS; +} + +static const struct ctl_argument + CTL_ARG(use_import_export_for_IPC) = CTL_ARG_INT; + +static const umf_ctl_node_t CTL_NODE(params)[] = { + CTL_LEAF_RW(use_import_export_for_IPC), CTL_NODE_END}; + static void initialize_ze_ctl(void) { + CTL_REGISTER_MODULE(&ze_memory_ctl_root, params); CTL_REGISTER_MODULE(&ze_memory_ctl_root, stats); } @@ -268,6 +326,7 @@ umf_result_t umfLevelZeroMemoryProviderParamsCreate( params->resident_device_handles = NULL; params->resident_device_count = 0; params->freePolicy = UMF_LEVEL_ZERO_MEMORY_PROVIDER_FREE_POLICY_DEFAULT; + params->use_import_export_for_IPC = 0; // disabled by default - use IPC params->device_ordinal = 0; strncpy(params->name, DEFAULT_NAME, sizeof(params->name) - 1); params->name[sizeof(params->name) - 1] = '\0'; @@ -421,11 +480,6 @@ static bool use_relaxed_allocation(ze_memory_provider_t *ze_provider, return size > ze_provider->device_properties.maxMemAllocSize; } -static ze_relaxed_allocation_limits_exp_desc_t relaxed_device_allocation_desc = - {.stype = ZE_STRUCTURE_TYPE_RELAXED_ALLOCATION_LIMITS_EXP_DESC, - .pNext = NULL, - .flags = ZE_RELAXED_ALLOCATION_LIMITS_EXP_FLAG_MAX_SIZE}; - static umf_result_t ze_memory_provider_free_helper(void *provider, void *ptr, size_t bytes, int update_stats) { @@ -483,11 +537,29 @@ static umf_result_t ze_memory_provider_alloc_helper(void *provider, size_t size, case UMF_MEMORY_TYPE_DEVICE: { ze_device_mem_alloc_desc_t dev_desc = { .stype = ZE_STRUCTURE_TYPE_DEVICE_MEM_ALLOC_DESC, - .pNext = use_relaxed_allocation(ze_provider, size) - ? &relaxed_device_allocation_desc - : NULL, + .pNext = NULL, .flags = 0, .ordinal = ze_provider->device_ordinal}; + void *lastNext = &dev_desc.pNext; + + ze_relaxed_allocation_limits_exp_desc_t + relaxed_device_allocation_desc_copy = + relaxed_device_allocation_desc; + if (use_relaxed_allocation(ze_provider, size)) { + // add relaxed allocation desc to the pNext chain + *(void **)lastNext = &relaxed_device_allocation_desc_copy; + lastNext = &relaxed_device_allocation_desc_copy.pNext; + } + + // check if the allocation should use import / export mechanism + ze_external_memory_export_desc_t memory_export_desc_copy = + memory_export_desc; + if (ze_provider->use_import_export_for_IPC == 1) { + // add external memory export desc to the pNext chain + *(void **)lastNext = &memory_export_desc_copy; + lastNext = &memory_export_desc_copy.pNext; + } + ze_result = g_ze_ops.zeMemAllocDevice(ze_provider->context, &dev_desc, size, alignment, ze_provider->device, resultPtr); @@ -647,6 +719,8 @@ static umf_result_t ze_memory_provider_initialize(const void *params, ze_provider->memory_type = umf2ze_memory_type(ze_params->memory_type); ze_provider->freePolicyFlags = umfFreePolicyToZePolicy(ze_params->freePolicy); + ze_provider->use_import_export_for_IPC = + ze_params->use_import_export_for_IPC; ze_provider->min_page_size = 0; ze_provider->device_ordinal = ze_params->device_ordinal; @@ -812,6 +886,7 @@ static umf_result_t ze_memory_provider_allocation_split(void *provider, typedef struct ze_ipc_data_t { int pid; + size_t size; ze_ipc_mem_handle_t ze_handle; } ze_ipc_data_t; @@ -827,20 +902,46 @@ static umf_result_t ze_memory_provider_get_ipc_handle(void *provider, const void *ptr, size_t size, void *providerIpcData) { - (void)size; - ze_result_t ze_result; ze_ipc_data_t *ze_ipc_data = (ze_ipc_data_t *)providerIpcData; struct ze_memory_provider_t *ze_provider = (struct ze_memory_provider_t *)provider; - ze_result = g_ze_ops.zeMemGetIpcHandle(ze_provider->context, ptr, - &ze_ipc_data->ze_handle); - if (ze_result != ZE_RESULT_SUCCESS) { - LOG_ERR("zeMemGetIpcHandle() failed."); - return ze2umf_result(ze_result); + if (ze_provider->use_import_export_for_IPC == 0) { + // default - IPC API + ze_result = g_ze_ops.zeMemGetIpcHandle(ze_provider->context, ptr, + &ze_ipc_data->ze_handle); + if (ze_result != ZE_RESULT_SUCCESS) { + LOG_ERR("zeMemGetIpcHandle() failed."); + return ze2umf_result(ze_result); + } + } else { + // import / export API (NOTE this requires additional flags enabled + // during the memory allocation) + ze_external_memory_export_fd_t fd_desc = { + .stype = ZE_STRUCTURE_TYPE_EXTERNAL_MEMORY_EXPORT_FD, + .pNext = NULL, + .flags = ZE_EXTERNAL_MEMORY_TYPE_FLAG_OPAQUE_WIN32, + .fd = 0}; + + ze_memory_allocation_properties_t mem_alloc_props = { + .stype = ZE_STRUCTURE_TYPE_MEMORY_ALLOCATION_PROPERTIES, + .pNext = &fd_desc, + .type = 0, + .id = 0, + .pageSize = 0}; + + ze_result = g_ze_ops.zeMemGetAllocProperties(ze_provider->context, ptr, + &mem_alloc_props, NULL); + if (ze_result != ZE_RESULT_SUCCESS) { + LOG_ERR("zeMemGetAllocProperties() failed."); + return ze2umf_result(ze_result); + } + + memcpy(&ze_ipc_data->ze_handle, &fd_desc.fd, sizeof(fd_desc.fd)); } + ze_ipc_data->size = size; ze_ipc_data->pid = utils_getpid(); return UMF_RESULT_SUCCESS; @@ -891,14 +992,41 @@ static umf_result_t ze_memory_provider_open_ipc_handle(void *provider, memcpy(&ze_ipc_handle, &fd_local, sizeof(fd_local)); } - ze_result = g_ze_ops.zeMemOpenIpcHandle( - ze_provider->context, ze_provider->device, ze_ipc_handle, 0, ptr); - if (fd_local != -1) { - (void)utils_close_fd(fd_local); - } - if (ze_result != ZE_RESULT_SUCCESS) { - LOG_ERR("zeMemOpenIpcHandle() failed."); - return ze2umf_result(ze_result); + if (ze_provider->use_import_export_for_IPC == 0) { + // default - IPC API + ze_result = g_ze_ops.zeMemOpenIpcHandle( + ze_provider->context, ze_provider->device, ze_ipc_handle, 0, ptr); + if (fd_local != -1) { + (void)utils_close_fd(fd_local); + } + if (ze_result != ZE_RESULT_SUCCESS) { + LOG_ERR("zeMemOpenIpcHandle() failed."); + return ze2umf_result(ze_result); + } + } else { + // import / export API + ze_external_memory_import_fd_t import_fd = { + .stype = ZE_STRUCTURE_TYPE_EXTERNAL_MEMORY_IMPORT_FD, + .pNext = NULL, + .flags = ZE_EXTERNAL_MEMORY_TYPE_FLAG_DMA_BUF, + .fd = fd_local}; + + ze_device_mem_alloc_desc_t alloc_desc = { + .stype = ZE_STRUCTURE_TYPE_DEVICE_MEM_ALLOC_DESC, + .pNext = &import_fd, + .flags = 0, + .ordinal = 0}; + ze_result = g_ze_ops.zeMemAllocDevice(ze_provider->context, &alloc_desc, + ze_ipc_data->size, 0, + ze_provider->device, ptr); + if (fd_local != -1) { + (void)utils_close_fd(fd_local); + } + + if (ze_result != ZE_RESULT_SUCCESS) { + LOG_ERR("zeMemAllocDevice() failed."); + return ze2umf_result(ze_result); + } } return UMF_RESULT_SUCCESS; diff --git a/src/utils/utils_windows_common.c b/src/utils/utils_windows_common.c index 7aa8f7684..4e1c63bd4 100644 --- a/src/utils/utils_windows_common.c +++ b/src/utils/utils_windows_common.c @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -47,10 +48,7 @@ int utils_getpid(void) { return GetCurrentProcessId(); } int utils_gettid(void) { return GetCurrentThreadId(); } -int utils_close_fd(int fd) { - (void)fd; // unused - return -1; -} +int utils_close_fd(int fd) { return CloseHandle((HANDLE)(uintptr_t)fd); } umf_result_t utils_errno_to_umf_result(int err) { (void)err; // unused @@ -58,10 +56,41 @@ umf_result_t utils_errno_to_umf_result(int err) { } umf_result_t utils_duplicate_fd(int pid, int fd_in, int *fd_out) { - (void)pid; // unused - (void)fd_in; // unused - (void)fd_out; // unused - return UMF_RESULT_ERROR_NOT_SUPPORTED; + umf_result_t ret = UMF_RESULT_SUCCESS; + HANDLE current_process_handle = GetCurrentProcess(); + if (!current_process_handle) { + LOG_ERR("GetCurrentProcess() failed."); + return UMF_RESULT_ERROR_UNKNOWN; + } + + HANDLE source_process_handle = OpenProcess(PROCESS_DUP_HANDLE, FALSE, pid); + if (!source_process_handle) { + LOG_ERR("OpenProcess() failed for pid=%d.", pid); + ret = UMF_RESULT_ERROR_UNKNOWN; + goto release_current; + } + + HANDLE handle_in = (HANDLE)(uintptr_t)fd_in; + HANDLE handle_out = NULL; + BOOL result = DuplicateHandle(source_process_handle, handle_in, + current_process_handle, &handle_out, + GENERIC_READ | GENERIC_WRITE, FALSE, 0); + if (!result) { + LOG_ERR("DuplicateHandle() failed for pid=%d fd_in=%d handle_in=%p", + pid, fd_in, handle_in); + ret = UMF_RESULT_ERROR_UNKNOWN; + goto release_source; + } + + *fd_out = (int)(uintptr_t)handle_out; + +release_source: + CloseHandle(source_process_handle); + +release_current: + CloseHandle(current_process_handle); + + return ret; } umf_result_t utils_translate_mem_protection_flags(unsigned in_protection, diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index d210b15f7..b6493b858 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -614,12 +614,37 @@ function(add_umf_ipc_test) set(SRC_DIR ${CMAKE_CURRENT_SOURCE_DIR}) endif() - file(COPY ${SRC_DIR}/${ARG_TEST}.sh DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) + if(WINDOWS) + set(EXT py) + else() + set(EXT sh) + endif() - add_test( - NAME ${TEST_NAME} - COMMAND ${ARG_TEST}.sh - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + file(COPY ${SRC_DIR}/${ARG_TEST}.${EXT} + DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) + + if(WINDOWS) + add_test( + NAME ${TEST_NAME} + COMMAND python ${ARG_TEST}.${EXT} + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + else() + add_test( + NAME ${TEST_NAME} + COMMAND ${ARG_TEST}.${EXT} + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + endif() + + if(WINDOWS) + set_tests_properties(${TEST_NAME} PROPERTIES + ENVIRONMENT "BUILD_TYPE=${CMAKE_BUILD_TYPE}") + # add PATH to DLL on Windows + set(DLL_PATH_LIST + "${DLL_PATH_LIST};PATH=path_list_append:${CMAKE_BINARY_DIR}/bin/;PATH=path_list_append:${CMAKE_BINARY_DIR}/bin/$/" + ) + set_property(TEST ${TEST_NAME} PROPERTY ENVIRONMENT_MODIFICATION + "${DLL_PATH_LIST}") + endif() set_tests_properties(${TEST_NAME} PROPERTIES LABELS "umf") set_tests_properties(${TEST_NAME} PROPERTIES TIMEOUT 60) @@ -628,6 +653,42 @@ function(add_umf_ipc_test) endif() endfunction() +if(WINDOWS) + set(UMF_IPC_LIBS ws2_32) +endif() + +if(UMF_BUILD_GPU_TESTS AND UMF_LEVEL_ZERO_ENABLED) + build_umf_test( + NAME ipc_level_zero_prov_consumer + SRCS providers/ipc_level_zero_prov_consumer.c common/ipc_common.c + providers/ipc_level_zero_prov_common.c + ${UMF_UTILS_DIR}/utils_level_zero.cpp + LIBS ze_loader ${UMF_IPC_LIBS} ${UMF_UTILS_FOR_TEST}) + build_umf_test( + NAME ipc_level_zero_prov_producer + SRCS providers/ipc_level_zero_prov_producer.c common/ipc_common.c + providers/ipc_level_zero_prov_common.c + ${UMF_UTILS_DIR}/utils_level_zero.cpp + LIBS ze_loader ${UMF_IPC_LIBS} ${UMF_UTILS_FOR_TEST}) + add_umf_ipc_test(TEST ipc_level_zero_prov SRC_DIR providers) +endif() + +if(UMF_BUILD_GPU_TESTS AND UMF_BUILD_CUDA_PROVIDER) + build_umf_test( + NAME ipc_cuda_prov_consumer + SRCS providers/ipc_cuda_prov_consumer.c common/ipc_common.c + providers/ipc_cuda_prov_common.c providers/cuda_helpers.cpp + LIBS cuda ${UMF_IPC_LIBS} ${UMF_UTILS_FOR_TEST}) + build_umf_test( + NAME ipc_cuda_prov_producer + SRCS providers/ipc_cuda_prov_producer.c common/ipc_common.c + providers/ipc_cuda_prov_common.c providers/cuda_helpers.cpp + LIBS cuda ${UMF_IPC_LIBS} ${UMF_UTILS_FOR_TEST}) + add_umf_ipc_test(TEST ipc_cuda_prov SRC_DIR providers) +endif() + +# TODO IPC tests for OS, file, devdax providers and proxy lib are supported only +# on Linux - skipping if(LINUX) if(UMF_POOL_SCALABLE_ENABLED) build_umf_test( @@ -671,39 +732,11 @@ if(LINUX) add_umf_ipc_test(TEST ipc_file_prov_fsdax) endif() - # TODO add IPC tests for CUDA - - if(UMF_BUILD_GPU_TESTS AND UMF_LEVEL_ZERO_ENABLED) - build_umf_test( - NAME ipc_level_zero_prov_consumer - SRCS providers/ipc_level_zero_prov_consumer.c common/ipc_common.c - providers/ipc_level_zero_prov_common.c - ${UMF_UTILS_DIR}/utils_level_zero.cpp - LIBS ze_loader ${UMF_UTILS_FOR_TEST}) - build_umf_test( - NAME ipc_level_zero_prov_producer - SRCS providers/ipc_level_zero_prov_producer.c common/ipc_common.c - providers/ipc_level_zero_prov_common.c - ${UMF_UTILS_DIR}/utils_level_zero.cpp - LIBS ze_loader ${UMF_UTILS_FOR_TEST}) - add_umf_ipc_test(TEST ipc_level_zero_prov SRC_DIR providers) - endif() - - if(UMF_BUILD_GPU_TESTS AND UMF_BUILD_CUDA_PROVIDER) - build_umf_test( - NAME ipc_cuda_prov_consumer - SRCS providers/ipc_cuda_prov_consumer.c common/ipc_common.c - providers/ipc_cuda_prov_common.c providers/cuda_helpers.cpp - LIBS cuda ${UMF_UTILS_FOR_TEST}) - build_umf_test( - NAME ipc_cuda_prov_producer - SRCS providers/ipc_cuda_prov_producer.c common/ipc_common.c - providers/ipc_cuda_prov_common.c providers/cuda_helpers.cpp - LIBS cuda ${UMF_UTILS_FOR_TEST}) - add_umf_ipc_test(TEST ipc_cuda_prov SRC_DIR providers) - endif() else() - message(STATUS "IPC tests are supported on Linux only - skipping") + message( + STATUS + "IPC tests for OS, file, devdax providers and proxy lib are supported only on Linux - skipping" + ) endif() if(LINUX diff --git a/test/common/ipc_common.c b/test/common/ipc_common.c index 5e9b911be..879d39286 100644 --- a/test/common/ipc_common.c +++ b/test/common/ipc_common.c @@ -5,13 +5,21 @@ * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception */ +#ifdef _WIN32 +#define _WINSOCK_DEPRECATED_NO_WARNINGS +#include +typedef int socklen_t; +typedef SSIZE_T ssize_t; +#else #include -#include -#include -#include #include #include #include +#endif + +#include +#include +#include #include "ipc_common.h" @@ -53,25 +61,39 @@ Generally communication between the producer and the consumer looks like: */ int consumer_connect(int port) { + +#ifdef _WIN32 + WSADATA wsaData; + SOCKET producer_socket, consumer_socket; +#else + int producer_socket = -1; + int consumer_socket = -1; +#endif + struct sockaddr_in consumer_addr; struct sockaddr_in producer_addr; int producer_addr_len; - int producer_socket = -1; - int consumer_socket = -1; - int ret = -1; + +#ifdef _WIN32 + // initialize Winsock + if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { + fprintf(stderr, "WSAStartup failed: %d\n", WSAGetLastError()); + return -1; + } +#endif // create a socket consumer_socket = socket(AF_INET, SOCK_STREAM, 0); if (consumer_socket < 0) { fprintf(stderr, "[consumer] ERROR: creating socket failed\n"); - return -1; + goto err_WSA_cleanup; } fprintf(stderr, "[consumer] Socket created\n"); // set the IP address and the port consumer_addr.sin_family = AF_INET; - consumer_addr.sin_port = htons(port); + consumer_addr.sin_port = htons((uint16_t)port); consumer_addr.sin_addr.s_addr = inet_addr(INET_ADDR); // bind to the IP address and the port @@ -101,14 +123,24 @@ int consumer_connect(int port) { } fprintf(stderr, "[consumer] Producer connected at IP %s and port %i\n", - inet_ntoa(producer_addr.sin_addr), ntohs(producer_addr.sin_port)); + inet_ntoa(producer_addr.sin_addr), + (int)ntohs(producer_addr.sin_port)); - ret = producer_socket; // success + return (int)producer_socket; // success err_close_consumer_socket: +#ifdef _WIN32 + closesocket(consumer_socket); +#else close(consumer_socket); +#endif - return ret; +err_WSA_cleanup: +#ifdef _WIN32 + WSACleanup(); +#endif + + return -1; } int run_consumer(int port, const umf_memory_pool_ops_t *pool_ops, @@ -117,7 +149,13 @@ int run_consumer(int port, const umf_memory_pool_ops_t *pool_ops, void *provider_params, memcopy_callback_t memcopy_callback, void *memcopy_ctx) { char consumer_message[MSG_SIZE]; + +#ifdef _WIN32 + SOCKET producer_socket; +#else int producer_socket = -1; +#endif + int ret = -1; umf_memory_provider_handle_t provider = NULL; umf_result_t umf_result = UMF_RESULT_ERROR_UNKNOWN; @@ -171,8 +209,8 @@ int run_consumer(int port, const umf_memory_pool_ops_t *pool_ops, IPC_handle_size); // send confirmation to the producer (IPC handle size) - recv_len = - send(producer_socket, &IPC_handle_size, sizeof(IPC_handle_size), 0); + recv_len = send(producer_socket, (const char *)&IPC_handle_size, + sizeof(IPC_handle_size), 0); if (recv_len < 0) { fprintf(stderr, "[consumer] ERROR: sending confirmation failed\n"); goto err_free_recv_buffer; @@ -214,8 +252,8 @@ int run_consumer(int port, const umf_memory_pool_ops_t *pool_ops, strcpy(consumer_message, "SKIP"); // send the SKIP response to the producer - send(producer_socket, consumer_message, strlen(consumer_message) + 1, - 0); + send(producer_socket, consumer_message, + (int)strlen(consumer_message) + 1, 0); goto err_free_recv_buffer; } @@ -249,8 +287,8 @@ int run_consumer(int port, const umf_memory_pool_ops_t *pool_ops, strcpy(consumer_message, CONSUMER_MSG); // send response to the producer - if (send(producer_socket, consumer_message, strlen(consumer_message) + 1, - 0) < 0) { + if (send(producer_socket, consumer_message, + (int)strlen(consumer_message) + 1, 0) < 0) { fprintf(stderr, "[consumer] ERROR: send() failed\n"); goto err_closeIPCHandle; } @@ -273,7 +311,12 @@ int run_consumer(int port, const umf_memory_pool_ops_t *pool_ops, free(recv_buffer); err_close_producer_socket: +#ifdef _WIN32 + closesocket(producer_socket); + WSACleanup(); +#else close(producer_socket); +#endif err_umfMemoryPoolDestroy: umfPoolDestroy(pool); @@ -295,20 +338,35 @@ int run_consumer(int port, const umf_memory_pool_ops_t *pool_ops, int producer_connect(int port) { struct sockaddr_in consumer_addr; + +#ifdef _WIN32 + WSADATA wsaData; + SOCKET producer_socket; +#else int producer_socket = -1; +#endif + +#ifdef _WIN32 + // initialize Winsock + if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { + fprintf(stderr, "WSAStartup failed. Error Code: %d\n", + WSAGetLastError()); + return -1; + } +#endif // create a producer socket producer_socket = socket(AF_INET, SOCK_STREAM, 0); if (producer_socket < 0) { fprintf(stderr, "[producer] ERROR: Unable to create socket\n"); - return -1; + goto err_WSA_cleanup; } fprintf(stderr, "[producer] Socket created\n"); // set IP address and port the same as for the consumer consumer_addr.sin_family = AF_INET; - consumer_addr.sin_port = htons(port); + consumer_addr.sin_port = htons((uint16_t)port); consumer_addr.sin_addr.s_addr = inet_addr(INET_ADDR); // send connection request to the consumer @@ -321,10 +379,19 @@ int producer_connect(int port) { fprintf(stderr, "[producer] Connected to the consumer\n"); - return producer_socket; // success + return (int)producer_socket; // success err_close_producer_socket_connect: +#ifdef _WIN32 + closesocket(producer_socket); +#else close(producer_socket); +#endif + +err_WSA_cleanup: +#ifdef _WIN32 + WSACleanup(); +#endif return -1; } @@ -340,18 +407,20 @@ int run_producer(int port, const umf_memory_pool_ops_t *pool_ops, int producer_socket = -1; char consumer_message[MSG_SIZE]; +#if !defined(_WIN32) ret = prctl(PR_SET_PTRACER, getppid()); if (ret == -1) { perror("PR_SET_PTRACER may be not supported. prctl() call failed"); goto err_end; } +#endif // create OS memory provider umf_result = umfMemoryProviderCreate(provider_ops, provider_params, &provider); if (umf_result != UMF_RESULT_SUCCESS) { fprintf(stderr, "[producer] ERROR: creating memory provider failed\n"); - return -1; + goto err_end; } umf_memory_pool_handle_t pool; @@ -421,8 +490,8 @@ int run_producer(int port, const umf_memory_pool_ops_t *pool_ops, } // send the IPC_handle_size to the consumer - ssize_t len = - send(producer_socket, &IPC_handle_size, sizeof(IPC_handle_size), 0); + ssize_t len = send(producer_socket, (const char *)&IPC_handle_size, + sizeof(IPC_handle_size), 0); if (len < 0) { fprintf(stderr, "[producer] ERROR: unable to send the message\n"); goto err_close_producer_socket; @@ -459,7 +528,8 @@ int run_producer(int port, const umf_memory_pool_ops_t *pool_ops, } // send the IPC_handle of IPC_handle_size to the consumer - if (send(producer_socket, IPC_handle, IPC_handle_size, 0) < 0) { + if (send(producer_socket, (const char *)IPC_handle, (int)IPC_handle_size, + 0) < 0) { fprintf(stderr, "[producer] ERROR: unable to send the message\n"); goto err_close_producer_socket; } @@ -512,7 +582,12 @@ int run_producer(int port, const umf_memory_pool_ops_t *pool_ops, } err_close_producer_socket: +#ifdef _WIN32 + closesocket(producer_socket); + WSACleanup(); +#else close(producer_socket); +#endif err_PutIPCHandle: umf_result = umfPutIPCHandle(IPC_handle); diff --git a/test/providers/ipc_cuda_prov.py b/test/providers/ipc_cuda_prov.py new file mode 100755 index 000000000..9200278cd --- /dev/null +++ b/test/providers/ipc_cuda_prov.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2025 Intel Corporation +# +# Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# + +import os +import sys +import time +import subprocess # nosec B404 +import platform + + +def main(): + # Port should be a number from the range <1024, 65535> + # Use PROCESS_ID environment variable if set, otherwise use current PID + pid = int(os.environ.get("PROCESS_ID", os.getpid())) + port = 1024 + (pid % (65535 - 1024)) + + # Set UMF_LOG environment variable + os.environ["UMF_LOG"] = "level:debug;flush:debug;output:stderr;pid:yes" + + build_type = os.environ.get("BUILD_TYPE", "Debug") + + # Determine executable extension based on platform + exe_ext = ".exe" if platform.system() == "Windows" else "" + + print(f"Starting test_ipc_cuda_prov CONSUMER on port {port} ...") + + # Start consumer process + consumer_cmd = [f"./{build_type}/test_ipc_cuda_prov_consumer{exe_ext}", str(port)] + with open("consumer_log.txt", "w") as consumer_log: + consumer_proc = subprocess.Popen( # nosec + consumer_cmd, stdout=consumer_log, stderr=subprocess.STDOUT + ) + + print("Waiting 5 sec ...") + time.sleep(5) + + print(f"Starting test_ipc_cuda_prov PRODUCER on port {port} ...") + + # Start producer process + producer_cmd = [f"./{build_type}/test_ipc_cuda_prov_producer{exe_ext}", str(port)] + with open("producer_log.txt", "w") as producer_log: + producer_proc = subprocess.Popen( # nosec + producer_cmd, stdout=producer_log, stderr=subprocess.STDOUT + ) + + print("Waiting 10 sec for the consumer and producer to finish ...") + time.sleep(10) + + # Wait for processes to complete + consumer_proc.wait() + producer_proc.wait() + + print("Test finished") + + # Display consumer log + print("Consumer log:") + try: + with open("consumer_log.txt", "r") as f: + print(f.read()) + except FileNotFoundError: + print("consumer_log.txt not found") + + # Display producer log + print("Producer log:") + try: + with open("producer_log.txt", "r") as f: + print(f.read()) + except FileNotFoundError: + print("producer_log.txt not found") + + # Check for errors in logs + error_found = False + for log_file in ["consumer_log.txt", "producer_log.txt"]: + try: + with open(log_file, "r") as f: + content = f.read().upper() + if "ERROR" in content or "FATAL" in content: + error_found = True + break + except FileNotFoundError: + continue + + if error_found: + print("Test failed: ERROR or FATAL found in logs.") + sys.exit(1) + + print("Test passed: No errors found in logs.") + + +if __name__ == "__main__": + main() diff --git a/test/providers/ipc_level_zero_prov.py b/test/providers/ipc_level_zero_prov.py new file mode 100755 index 000000000..b08bf5bdf --- /dev/null +++ b/test/providers/ipc_level_zero_prov.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2025 Intel Corporation +# +# Under the Apache License v2.0 with LLVM Exceptions. See LICENSE.TXT. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# + +import os +import sys +import time +import subprocess # nosec B404 +import platform + + +def main(): + # Port should be a number from the range <1024, 65535> + # Use PROCESS_ID environment variable if set, otherwise use current PID + pid = int(os.environ.get("PROCESS_ID", os.getpid())) + port = 1024 + (pid % (65535 - 1024)) + + # Set UMF_LOG environment variable + os.environ["UMF_LOG"] = "level:debug;flush:debug;output:stderr;pid:yes" + + build_type = os.environ.get("BUILD_TYPE", "Debug") + + # Determine executable extension based on platform + exe_ext = ".exe" if platform.system() == "Windows" else "" + + print(f"Starting test_ipc_level_zero_prov CONSUMER on port {port} ...") + + # Start consumer process + consumer_cmd = [ + f"./{build_type}/test_ipc_level_zero_prov_consumer{exe_ext}", + str(port), + ] + with open("consumer_log.txt", "w") as consumer_log: + consumer_proc = subprocess.Popen( # nosec + consumer_cmd, stdout=consumer_log, stderr=subprocess.STDOUT + ) + + print("Waiting 5 sec ...") + time.sleep(5) + + print(f"Starting test_ipc_level_zero_prov PRODUCER on port {port} ...") + + # Start producer process + producer_cmd = [ + f"./{build_type}/test_ipc_level_zero_prov_producer{exe_ext}", + str(port), + ] + with open("producer_log.txt", "w") as producer_log: + producer_proc = subprocess.Popen( # nosec + producer_cmd, stdout=producer_log, stderr=subprocess.STDOUT + ) + + print("Waiting 10 sec for the consumer and producer to finish ...") + time.sleep(10) + + # Wait for processes to complete + consumer_proc.wait() + producer_proc.wait() + + print("Test finished") + + # Display consumer log + print("Consumer log:") + try: + with open("consumer_log.txt", "r") as f: + print(f.read()) + except FileNotFoundError: + print("consumer_log.txt not found") + + # Display producer log + print("Producer log:") + try: + with open("producer_log.txt", "r") as f: + print(f.read()) + except FileNotFoundError: + print("producer_log.txt not found") + + # Check for errors in logs + error_found = False + for log_file in ["consumer_log.txt", "producer_log.txt"]: + try: + with open(log_file, "r") as f: + content = f.read().upper() + if "ERROR" in content or "FATAL" in content: + error_found = True + break + except FileNotFoundError: + continue + + if error_found: + print("Test failed: ERROR or FATAL found in logs.") + sys.exit(1) + + print("Test passed: No errors found in logs.") + + +if __name__ == "__main__": + main() diff --git a/test/providers/ipc_level_zero_prov_consumer.c b/test/providers/ipc_level_zero_prov_consumer.c index 5fb212881..abe08e7c8 100644 --- a/test/providers/ipc_level_zero_prov_consumer.c +++ b/test/providers/ipc_level_zero_prov_consumer.c @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -88,6 +89,15 @@ int main(int argc, char *argv[]) { goto destroy_provider_params; } +#ifdef _WIN32 + // NOTE: On Windows, we must use the import / export memory exchange policy + // because IPC currently does not work + int use_import_export_for_IPC = 1; + umfCtlSet( + "umf.provider.default.LEVEL_ZERO.params.use_import_export_for_IPC", + &use_import_export_for_IPC, sizeof(use_import_export_for_IPC)); +#endif + umf_disjoint_pool_params_handle_t pool_params = NULL; umf_result = umfDisjointPoolParamsCreate(&pool_params); diff --git a/test/providers/ipc_level_zero_prov_producer.c b/test/providers/ipc_level_zero_prov_producer.c index e6ffcf2ed..535020eae 100644 --- a/test/providers/ipc_level_zero_prov_producer.c +++ b/test/providers/ipc_level_zero_prov_producer.c @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -88,6 +89,15 @@ int main(int argc, char *argv[]) { goto destroy_provider_params; } +#ifdef _WIN32 + // NOTE: On Windows, we must use the import/export memory exchange policy + // because IPC currently does not work + int use_import_export_for_IPC = 1; + umfCtlSet( + "umf.provider.default.LEVEL_ZERO.params.use_import_export_for_IPC", + &use_import_export_for_IPC, sizeof(use_import_export_for_IPC)); +#endif + umf_disjoint_pool_params_handle_t pool_params = NULL; umf_result = umfDisjointPoolParamsCreate(&pool_params); diff --git a/test/providers/provider_level_zero.cpp b/test/providers/provider_level_zero.cpp index 9d72e4f36..1894d0aa4 100644 --- a/test/providers/provider_level_zero.cpp +++ b/test/providers/provider_level_zero.cpp @@ -476,6 +476,41 @@ TEST_P(umfLevelZeroProviderTest, ctl_stats) { umfMemoryProviderDestroy(provider); } +TEST_P(umfLevelZeroProviderTest, ctl_use_import_export_for_IPC) { + umf_memory_provider_handle_t provider = nullptr; + umf_result_t ret = umfMemoryProviderCreate(umfLevelZeroMemoryProviderOps(), + params, &provider); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + ASSERT_NE(provider, nullptr); + + // Test reading the default value (0 = IPC) + int use_import_export_for_IPC = 1; // Set to invalid value first + ret = + umfCtlGet("umf.provider.by_handle.{}.params.use_import_export_for_IPC", + &use_import_export_for_IPC, sizeof(use_import_export_for_IPC), + provider); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + ASSERT_EQ(use_import_export_for_IPC, 0); // Default is IPC (0) + + // Test writing a new value (1 = import/export) + int new_policy = 1; + ret = + umfCtlSet("umf.provider.by_handle.{}.params.use_import_export_for_IPC", + &new_policy, sizeof(new_policy), provider); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + + // Test reading the updated value + use_import_export_for_IPC = 0; // Set to different value first + ret = + umfCtlGet("umf.provider.by_handle.{}.params.use_import_export_for_IPC", + &use_import_export_for_IPC, sizeof(use_import_export_for_IPC), + provider); + ASSERT_EQ(ret, UMF_RESULT_SUCCESS); + ASSERT_EQ(use_import_export_for_IPC, 1); // Should be import/export (1) + + umfMemoryProviderDestroy(provider); +} + TEST_P(umfLevelZeroProviderTest, custom_name) { const char *custom = "my_level_zero"; ASSERT_EQ(umfLevelZeroMemoryProviderParamsSetName(params, custom),