-
Notifications
You must be signed in to change notification settings - Fork 0
QDB-10908 - Add Arrow query API support to Python API #108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
85088f1
e155e3b
fdd991f
fa6a636
42b6cc8
78b6f2a
4797f5e
6d0a8d9
df04b1e
c96db3e
7d8143e
0cad78c
d5d5ae2
c557e37
a7722d5
6a7ca8f
95d5de5
e036c6e
67b877b
0f14b8a
ab98f8b
638a9a4
9c08e4f
dd9426d
7482652
a42339b
94d7a7a
0f5769c
868a78d
966e140
aea6bb7
7cdd5a6
cc57cb5
df9b4b6
32d9b24
7749d35
ab7dff2
00e68ad
a8d0e5f
b249687
2c4fde7
d1e5536
9b4912f
1ec2bd8
4ee0770
493363c
e5adffc
cde8d0b
af983e0
5a5ab22
9083933
6a679d9
e6e1fd0
0ed4c18
9779674
4eb78d5
d5e0951
5bd1c25
bee83e8
4154ccb
970b7de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -148,3 +148,4 @@ insecure/ | |
| secure/ | ||
| user_private.key | ||
| users.cfg | ||
| tests/test_perf.out | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
|
|
||
| # Numpy / Pandas are difficult to build in various environments (especially | ||
| # FreeBSD), and there are some conflicting requirements. For example, Numpy | ||
| # doesn't have any version that works on both Python 3.6 and Python 3.10. | ||
|
|
||
| # numpy ~= 1.19.5; python_version <= '3.7' | ||
| numpy ~= 1.20.3; python_version == '3.8' | ||
| numpy ~= 1.20.3; python_version == '3.9' | ||
| numpy >= 2.0.1; python_version > '3.9' | ||
|
|
||
| pandas ~= 2.0.3; python_version == '3.9' | ||
| pandas ~= 2.0.3; python_version == '3.8' | ||
| pandas ~= 1.3.5; python_version == '3.7' | ||
| pandas ~= 1.1.5; python_version <= '3.6' | ||
|
|
||
| # Need 2.1.2 for numpy 2.0 support | ||
| pandas >= 2.1.2; python_version > '3.9' | ||
|
|
||
| ## Any environment | ||
|
|
||
| build | ||
| cmake | ||
| ninja | ||
| setuptools >= 61 | ||
| wheel | ||
|
|
||
| pytest | ||
| pytest-runner | ||
| pytest-benchmark | ||
|
|
||
| # Seems like numpy>2 requires this in combination with pytest, | ||
| # but is never set in the requirements. | ||
| hypothesis | ||
|
|
||
| teamcity-messages == 1.29 | ||
| setuptools-git == 1.2 | ||
|
|
||
| # Linting | ||
| black==24.10.0; python_version >= '3.9' | ||
| black==23.3.0; python_version < '3.9' | ||
|
|
||
| # Stubs | ||
| mypy | ||
| pybind11-stubgen | ||
| pandas-stubs |
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the purpose of introducing an entirely new file? Wouldn't we want this to be in |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,272 @@ | ||
| /* | ||
| * | ||
| * Official Python API | ||
| * | ||
| * Copyright (c) 2009-2021, quasardb SAS. All rights reserved. | ||
| * All rights reserved. | ||
| * | ||
| * Redistribution and use in source and binary forms, with or without | ||
| * modification, are permitted provided that the following conditions are met: | ||
| * | ||
| * * Redistributions of source code must retain the above copyright | ||
| * notice, this list of conditions and the following disclaimer. | ||
| * * Redistributions in binary form must reproduce the above copyright | ||
| * notice, this list of conditions and the following disclaimer in the | ||
| * documentation and/or other materials provided with the distribution. | ||
| * * Neither the name of quasardb nor the names of its contributors may | ||
| * be used to endorse or promote products derived from this software | ||
| * without specific prior written permission. | ||
| * | ||
| * THIS SOFTWARE IS PROVIDED BY QUASARDB AND CONTRIBUTORS ``AS IS'' AND ANY | ||
| * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | ||
| * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||
| * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY | ||
| * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | ||
| * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | ||
| * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | ||
| * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | ||
| * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
| */ | ||
|
|
||
| #include "arrow_batch_push.hpp" | ||
| #include <convert/value.hpp> | ||
| #include <detail/writer.hpp> | ||
| #include <algorithm> | ||
| #include <metrics.hpp> | ||
|
|
||
| namespace qdb | ||
| { | ||
|
|
||
| namespace | ||
| { | ||
|
|
||
| class arrow_stream_holder | ||
| { | ||
| public: | ||
| explicit arrow_stream_holder(pybind11::object reader) | ||
| : _reader{std::move(reader)} | ||
| { | ||
| pybind11::gil_scoped_acquire gil; | ||
|
|
||
| ArrowArrayStream tmp{}; | ||
| try | ||
| { | ||
| _reader.attr("_export_to_c")(pybind11::int_(reinterpret_cast<uintptr_t>(&tmp))); | ||
| } | ||
| catch (...) | ||
| { | ||
| safe_release(tmp); | ||
| throw; | ||
| } | ||
|
|
||
| _stream = tmp; | ||
| } | ||
|
|
||
| ~arrow_stream_holder() | ||
| { | ||
| try | ||
| { | ||
| reset(); | ||
| } | ||
| catch (...) | ||
| { | ||
| // do nothing, we must not throw from the destructor | ||
| } | ||
| } | ||
|
|
||
| void detach() noexcept | ||
| { | ||
| invalidate_stream(); | ||
| } | ||
|
|
||
| arrow_stream_holder(const arrow_stream_holder &) = delete; | ||
| arrow_stream_holder & operator=(const arrow_stream_holder &) = delete; | ||
| arrow_stream_holder(arrow_stream_holder &&) = delete; | ||
| arrow_stream_holder & operator=(arrow_stream_holder &&) = delete; | ||
|
|
||
| ArrowArrayStream & stream() noexcept | ||
| { | ||
| return _stream; | ||
| } | ||
|
|
||
| ArrowArrayStream release_stream() noexcept | ||
| { | ||
| ArrowArrayStream out = _stream; | ||
| invalidate_stream(); | ||
| return out; | ||
| } | ||
|
|
||
| private: | ||
| void reset() noexcept | ||
| { | ||
| if (_stream.release) | ||
| { | ||
| _stream.release(&_stream); | ||
| invalidate_stream(); | ||
| } | ||
| } | ||
|
|
||
| static void invalidate_stream_struct(ArrowArrayStream & s) noexcept | ||
| { | ||
| s.release = nullptr; | ||
| s.get_next = nullptr; | ||
| s.get_schema = nullptr; | ||
| s.private_data = nullptr; | ||
| } | ||
|
|
||
| void invalidate_stream() noexcept | ||
| { | ||
| invalidate_stream_struct(_stream); | ||
| } | ||
|
|
||
| static void safe_release(ArrowArrayStream & s) noexcept | ||
| { | ||
| if (!s.release) return; | ||
|
|
||
| if (Py_IsInitialized()) | ||
| { | ||
| pybind11::gil_scoped_acquire gil; | ||
| s.release(&s); | ||
| } | ||
| else | ||
| { | ||
| // Oops. Python is not initialized | ||
| } | ||
|
|
||
| invalidate_stream_struct(s); | ||
| } | ||
|
|
||
| pybind11::object _reader; | ||
| ArrowArrayStream _stream{}; | ||
| }; | ||
|
|
||
| struct arrow_batch | ||
| { | ||
| arrow_stream_holder stream; | ||
| std::vector<std::string> duplicate_names; | ||
| std::vector<char const *> duplicate_ptrs; | ||
|
|
||
| explicit arrow_batch(pybind11::object reader) | ||
| : stream{std::move(reader)} | ||
| {} | ||
|
|
||
| static inline void set_deduplication_mode( | ||
| detail::deduplication_mode_t mode, bool columns, qdb_exp_batch_push_arrow_t & out) | ||
| { | ||
| // Set deduplication mode only when `columns` is true, in which we will deduplicate based on | ||
| // *all* columns. | ||
| out.deduplication_mode = | ||
| (columns == true ? detail::to_qdb(mode) : qdb_exp_batch_deduplication_mode_disabled); | ||
| } | ||
|
|
||
| inline void set_deduplication_mode(detail::deduplication_mode_t mode, | ||
| std::vector<std::string> const & columns, | ||
| qdb_exp_batch_push_arrow_t & out) | ||
| { | ||
| duplicate_names = columns; // save names to keep them alive | ||
|
|
||
| duplicate_ptrs.resize(duplicate_names.size()); | ||
| std::transform(duplicate_names.begin(), duplicate_names.end(), duplicate_ptrs.begin(), | ||
| [](std::string const & s) { return s.c_str(); }); | ||
|
|
||
| out.deduplication_mode = detail::to_qdb(mode); | ||
| out.where_duplicate = duplicate_ptrs.data(); | ||
| out.where_duplicate_count = static_cast<qdb_size_t>(duplicate_ptrs.size()); | ||
| } | ||
|
|
||
| qdb_exp_batch_push_arrow_t build(const std::string & table_name, | ||
| const detail::deduplicate_options & dedup, | ||
| qdb_ts_range_t * ranges, | ||
| qdb_size_t ranges_count) | ||
| { | ||
| qdb_exp_batch_push_arrow_t batch{}; | ||
|
|
||
| batch.name = table_name.c_str(); | ||
| batch.stream = stream.stream(); | ||
| batch.truncate_ranges = ranges; | ||
| batch.truncate_range_count = (ranges == nullptr ? 0u : ranges_count); | ||
| batch.where_duplicate = nullptr; | ||
| batch.where_duplicate_count = 0u; | ||
|
|
||
| std::visit([&mode = dedup.mode_, &batch, this]( | ||
| auto const & columns) { set_deduplication_mode(mode, columns, batch); }, | ||
| dedup.columns_); | ||
|
|
||
| stream.detach(); | ||
| return batch; | ||
| } | ||
| }; | ||
|
|
||
| } // namespace | ||
|
|
||
| void exp_batch_push_arrow_with_options(handle_ptr handle, | ||
| const std::string & table_name, | ||
| const pybind11::object & reader, | ||
| pybind11::kwargs args) | ||
| { | ||
|
Comment on lines
+203
to
+207
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a qdb-api-python-specific function, or something from our C API? Because it doesn't integrate well / match the design of the existing
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like most of this code is duplicated from elsewhere? |
||
| auto dedup = detail::deduplicate_options::from_kwargs(args); | ||
| qdb_exp_batch_options_t options = detail::batch_options::from_kwargs(args); | ||
|
|
||
| std::vector<qdb_ts_range_t> truncate_ranges; | ||
| qdb_ts_range_t * range_ptr = nullptr; | ||
|
|
||
| if (options.mode == qdb_exp_batch_push_truncate) | ||
| [[unlikely]] // Unlikely because truncate isn't used much | ||
| { | ||
| if (args.contains(detail::batch_truncate_ranges::kw_range)) | ||
| { | ||
| truncate_ranges = detail::batch_truncate_ranges::from_kwargs(args); | ||
| range_ptr = truncate_ranges.data(); | ||
| } | ||
| else | ||
| { | ||
| throw qdb::invalid_argument_exception{"No truncate range provided."}; | ||
| } | ||
| } | ||
|
|
||
| arrow_batch batch{reader}; | ||
| auto c_batch = batch.build(table_name, dedup, range_ptr, truncate_ranges.size()); | ||
|
|
||
| qdb::logger logger("quasardb.batch_push_arrow"); | ||
| logger.debug("Pushing Arrow stream in %s using %s push mode", table_name, | ||
| detail::batch_push_mode::to_string(options.mode)); | ||
| qdb_error_t err{qdb_e_ok}; | ||
| { | ||
| // Make sure to measure the time it takes to do the actual push. | ||
| // This is in its own scoped block so that we only actually measure | ||
| // the push time, not e.g. retry time. | ||
| qdb::metrics::scoped_capture capture{"qdb_batch_push_arrow"}; | ||
|
|
||
| err = qdb_exp_batch_push_arrow_with_options(*handle, &options, &c_batch, nullptr, 1u); | ||
| } | ||
|
|
||
| auto retry_options = detail::retry_options::from_kwargs(args); | ||
| if (retry_options.should_retry(err)) | ||
| [[unlikely]] // Unlikely, because err is most likely to be qdb_e_ok | ||
| { | ||
| if (err == qdb_e_async_pipe_full) [[likely]] | ||
| { | ||
| logger.info("Async pipelines are currently full"); | ||
| } | ||
| else | ||
| { | ||
| logger.warn("A temporary error occurred"); | ||
| } | ||
|
|
||
| std::chrono::milliseconds delay = retry_options.delay; | ||
| logger.info("Sleeping for %d milliseconds", delay.count()); | ||
|
|
||
| std::this_thread::sleep_for(delay); | ||
|
|
||
| // Now try again -- easier way to go about this is to enter recursion. Note how | ||
| // we permutate the retry_options, which automatically adjusts the amount of retries | ||
| // left and the next sleep duration. | ||
| logger.warn("Retrying push operation, retries left: %d", retry_options.retries_left); | ||
| err = qdb_exp_batch_push_arrow_with_options(*handle, &options, &c_batch, nullptr, 1u); | ||
| } | ||
|
|
||
| qdb::qdb_throw_if_error(*handle, err); | ||
| } | ||
|
|
||
| } // namespace qdb | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * | ||
| * Official Python API | ||
| * | ||
| * Copyright (c) 2009-2021, quasardb SAS. All rights reserved. | ||
| * All rights reserved. | ||
| * | ||
| * Redistribution and use in source and binary forms, with or without | ||
| * modification, are permitted provided that the following conditions are met: | ||
| * | ||
| * * Redistributions of source code must retain the above copyright | ||
| * notice, this list of conditions and the following disclaimer. | ||
| * * Redistributions in binary form must reproduce the above copyright | ||
| * notice, this list of conditions and the following disclaimer in the | ||
| * documentation and/or other materials provided with the distribution. | ||
| * * Neither the name of quasardb nor the names of its contributors may | ||
| * be used to endorse or promote products derived from this software | ||
| * without specific prior written permission. | ||
| * | ||
| * THIS SOFTWARE IS PROVIDED BY QUASARDB AND CONTRIBUTORS ``AS IS'' AND ANY | ||
| * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | ||
| * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||
| * DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY | ||
| * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | ||
| * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | ||
| * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | ||
| * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | ||
| * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | ||
| * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||
| */ | ||
|
|
||
| #pragma once | ||
|
|
||
| #include "handle.hpp" | ||
| #include <pybind11/pybind11.h> | ||
|
|
||
| namespace qdb | ||
| { | ||
|
|
||
| void exp_batch_push_arrow_with_options(handle_ptr handle, | ||
| const std::string & table_name, | ||
| const pybind11::object & reader, | ||
| pybind11::kwargs args); | ||
|
|
||
| } // namespace qdb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you introduce a whole new file for this? This is only for development, not for actual dependencies / releasing?
I think we dropped win32 support, does that make things easier?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont understand this idea to accumulate table by individual columns on C level. It needs when we work with the
qdb tableobject.Batch writers operate with whole table. And we can prepare this table on the python level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem concerns memory ownership, native numpy arrays, and differences in representation.
Let's say I have this code:
The lifetime of
df(and all numpy arrays underneath it) are only scoped to the for() loop.Additionally, timestamps and string data are represented in entirely different ways between our C API and Python.
Additionally, Python offers no guarantees at all about memory stability between function invocations: all memory can be moved around in between multiple invocations.
The solution employed in the APIs is:
It is possible that with PyArrow, since the representation of the data between Python and our C API is the same, but then still we must tell the Python GC that our batch writer objects references the PyArrow objects, otherwise we'll get into ownership issues if e.g. Python decides to garbage collect. I.e. this is tricky and needs to be considered carefully. You can read more about this over here: https://pybind11.readthedocs.io/en/stable/advanced/functions.html, especially the section "additional call policies".