C++ 列式内存布局数据存储格式 Arrow

news2024/9/27 9:29:51

Apache Arrow 优点 :
    高性能数据处理: Arrow 使用列式内存布局,这特别适合于数据分析和查询操作,因为它允许对数据进行高效批量处理,减少CPU缓存未命中,从而提升处理速度。
    零拷贝数据共享: Arrow 允许不同系统和进程之间直接共享内存中的数据而无需复制,这对于提高数据密集型应用的效率至关重要,减少了内存使用和CPU开销。
    跨平台兼容性: Arrow 是一个跨语言开发平台,支持C++, Java, Python等多种编程语言,促进了不同软件组件间的互操作性。
    标准化数据格式: 定义了一套统一的数据格式规范,使得数据可以在不同系统间无缝传递,降低了数据转换的成本和复杂性。
    优化大数据处理: 特别是在与大数据框架(如Spark、Pandas)集成时,Arrow 可显著加速数据加载、处理和分析的速度,例如,与PySpark集成后数据处理速度提升高达53倍。
    集成广泛: 被众多数据处理工具和库采用,如Pandas、Parquet、Drill、Spark等,形成了强大的生态系统。
Apache Arrow 缺点 :
    内存消耗: 列式存储相对于行式存储可能需要更多的内存,尤其是在处理稀疏数据或宽表时,因为每一列都需要分配连续的内存空间。
    不适合所有场景: 对于需要频繁随机访问记录或更新操作的场景,Arrow 的列式存储可能不如传统的行式存储高效。
    学习曲线: 对于新用户来说,理解和掌握Arrow的数据结构和API可能需要一定时间,尤其是当他们习惯于使用其他数据处理模型时。
    生态成熟度: 虽然Arrow的生态系统正在快速发展,但在某些特定领域或小众技术栈中,相关支持和工具可能不够丰富或成熟。
    实现复杂性: 对于开发者来说,实现Arrow的高效利用可能涉及到复杂的内存管理和优化策略,这在某些情况下可能会增加开发难度。


#define ARROW_COMPUTE

#include <arrow/compute/api.h>
#include "arrow/pretty_print.h"
#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/json/api.h>
#include <arrow/io/api.h>
#include <arrow/table.h>
#include <arrow/pretty_print.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/ipc/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include <memory>
#include <iostream>

template <typename T>
using numbuildT = arrow::NumericBuilder<T>;


struct ArrowUtil {
    static arrow::Status read_csv(char const* file_name, std::shared_ptr<arrow::Table>& tb);
    static arrow::Status read_ipc(char const* file_name, std::shared_ptr<arrow::Table>& tb);
    static arrow::Status read_parquet(char const* file_name, std::shared_ptr<arrow::Table>& tb);
    static arrow::Status read_json(char const* file_name, std::shared_ptr<arrow::Table>& tb);

    static arrow::Status write_ipc(arrow::Table const& tb, char const* file_name);
    static arrow::Status write_parquet(arrow::Table const& tb, char const* file_name);

    template <typename T, typename buildT, typename arrayT>
    inline static std::shared_ptr<arrow::Array> chunked_array_to_array(std::shared_ptr<arrow::ChunkedArray> const& array_a) {
        buildT int64_builder;
        int64_builder.Resize(array_a->length());
        std::vector<T> int64_values;
        int64_values.reserve(array_a->length());
        for (int i = 0; i < array_a->num_chunks(); ++i) {
            auto inner_arr = array_a->chunk(i);
            auto int_a = std::static_pointer_cast<arrayT>(inner_arr);
            for (int j = 0; j < int_a->length(); ++j) {
                int64_values.push_back(int_a->Value(j));
            }
        }

        int64_builder.AppendValues(int64_values);
        std::shared_ptr<arrow::Array> array_a_res;
        int64_builder.Finish(&array_a_res);
        return array_a_res;
    }


    template <typename T, typename arrayT>
    inline static std::vector<T> chunked_array_to_vector(std::shared_ptr<arrow::ChunkedArray> const& array_a) {
        std::vector<T> int64_values;
        int64_values.reserve(array_a->length());
        for (int i = 0; i < array_a->num_chunks(); ++i) {
            auto inner_arr = array_a->chunk(i);
            auto int_a = std::static_pointer_cast<arrayT>(inner_arr);
            for (int j = 0; j < int_a->length(); ++j) {
                int64_values.push_back(int_a->Value(j));
            }
        }
        return int64_values;
    }

    inline static std::vector<std::string> chunked_array_to_str_vector(std::shared_ptr<arrow::ChunkedArray> const& array_a) {
        std::vector<std::string> int64_values;
        int64_values.reserve(array_a->length());
        for (int i = 0; i < array_a->num_chunks(); ++i) {
            auto inner_arr = array_a->chunk(i);
            auto int_a = std::static_pointer_cast<arrow::StringArray>(inner_arr);
            for (int j = 0; j < int_a->length(); ++j) {
                int64_values.push_back(int_a->Value(j).data());
            }
        }
        return int64_values;
    }


    inline static std::shared_ptr<arrow::Array> chunked_array_to_str_array(std::shared_ptr<arrow::ChunkedArray> const& array_a) {
        arrow::StringBuilder int64_builder;
        int64_builder.Resize(array_a->length());
        std::vector<std::string> int64_values;
        int64_values.reserve(array_a->length());
        for (int i = 0; i < array_a->num_chunks(); ++i) {
            auto inner_arr = array_a->chunk(i);
            auto int_a = std::static_pointer_cast<arrow::StringArray>(inner_arr);
            for (int j = 0; j < int_a->length(); ++j) {
                int64_values.push_back(int_a->Value(j).data());
            }
        }
        int64_builder.AppendValues(int64_values);
        std::shared_ptr<arrow::Array> array_a_res;
        int64_builder.Finish(&array_a_res);
        return array_a_res;
    }
};


arrow::Status ArrowUtil::read_csv(char const* file_name, std::shared_ptr<arrow::Table>& tb) {
    ARROW_ASSIGN_OR_RAISE(auto input_file,
        arrow::io::ReadableFile::Open(file_name));
    ARROW_ASSIGN_OR_RAISE(auto csv_reader,
        arrow::csv::TableReader::Make(
            arrow::io::default_io_context(), input_file,
            arrow::csv::ReadOptions::Defaults(),
            arrow::csv::ParseOptions::Defaults(),
            arrow::csv::ConvertOptions::Defaults()));
    ARROW_ASSIGN_OR_RAISE(auto table, csv_reader->Read());
    tb = table;
    return arrow::Status::OK();
}

arrow::Status ArrowUtil::read_ipc(char const* file_name, std::shared_ptr<arrow::Table>& tb) {
    ARROW_ASSIGN_OR_RAISE(auto input_file,
        arrow::io::ReadableFile::Open(file_name));

    ARROW_ASSIGN_OR_RAISE(auto ipc_reader, arrow::ipc::RecordBatchFileReader::Open(input_file));

    std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
    batches.reserve(ipc_reader->num_record_batches());

    for (int i = 0; i < ipc_reader->num_record_batches(); ++i) {
        ARROW_ASSIGN_OR_RAISE(auto a_record, ipc_reader->ReadRecordBatch(i));
        batches.emplace_back(std::move(a_record));
    }

    arrow::Table::FromRecordBatches(ipc_reader->schema(), std::move(batches)).Value(&tb);

    return arrow::Status::OK();
}

arrow::Status ArrowUtil::read_parquet(char const* file_name, std::shared_ptr<arrow::Table>& tb) {
    std::shared_ptr<arrow::io::ReadableFile> infile;
    PARQUET_ASSIGN_OR_THROW(infile,
        arrow::io::ReadableFile::Open(file_name,
            arrow::default_memory_pool()));

    std::unique_ptr<parquet::arrow::FileReader> reader;
    PARQUET_THROW_NOT_OK(
        parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
    std::shared_ptr<arrow::Table> table;
    PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
    tb = table;
    return arrow::Status::OK();
}

arrow::Status ArrowUtil::read_json(char const* file_name, std::shared_ptr<arrow::Table>& tb) {
    std::shared_ptr<arrow::io::ReadableFile> infile;
    PARQUET_ASSIGN_OR_THROW(infile,
        arrow::io::ReadableFile::Open(file_name,
            arrow::default_memory_pool()));

    ARROW_ASSIGN_OR_RAISE(auto reader, arrow::json::TableReader::Make(arrow::default_memory_pool(), infile, arrow::json::ReadOptions::Defaults(), arrow::json::ParseOptions::Defaults()));

    ARROW_ASSIGN_OR_RAISE(auto res_tb, reader->Read());
    tb = res_tb;
    return arrow::Status::OK();
}

arrow::Status ArrowUtil::write_ipc(arrow::Table const& tb, char const* file_name) {

    ARROW_ASSIGN_OR_RAISE(auto output_file,
        arrow::io::FileOutputStream::Open(file_name));
    ARROW_ASSIGN_OR_RAISE(auto batch_writer,
        arrow::ipc::MakeFileWriter(output_file, tb.schema()));
    ARROW_RETURN_NOT_OK(batch_writer->WriteTable(tb));
    ARROW_RETURN_NOT_OK(batch_writer->Close());

    return arrow::Status::OK();
}

arrow::Status ArrowUtil::write_parquet(arrow::Table const& tb, char const* file_name) {
    std::shared_ptr<arrow::io::FileOutputStream> outfile;
    PARQUET_ASSIGN_OR_THROW(
        outfile, arrow::io::FileOutputStream::Open(file_name));
    // The last argument to the function call is the size of the RowGroup in
    // the parquet file. Normally you would choose this to be rather large but
    // for the example, we use a small value to have multiple RowGroups.
    PARQUET_THROW_NOT_OK(
        parquet::arrow::WriteTable(tb, arrow::default_memory_pool(), outfile, 3));
    return arrow::Status::OK();
}



void testReadCSV() {
    // 读取CSV文件
    char const* csv_path = "./test.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(csv_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    assert(tb_.num_rows() == 2);
}

void testWriteIpc() {
    // 读取CSV文件并写入IPC文件
    char const* csv_path = "./test.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(csv_path, tb);
    auto const& tb_ = *tb;

    char const* write_csv_path = "./test_dst.arrow";
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto write_res = ArrowUtil::write_ipc(tb_, write_csv_path);
    assert(write_res == arrow::Status::OK());
}

void testReadIPC() {
    // 读取Arrow IPC 文件
    char const* ipc_path = "./test_dst.arrow";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_ipc(ipc_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    assert(tb_.num_rows() == 2);
}


void testWriteParquet() {
    // 写入Parquet文件
    char const* csv_path = "./test.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(csv_path, tb);
    auto const& tb_ = *tb;

    char const* write_parquet_path = "./test_dst.parquet";
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto write_res = ArrowUtil::write_parquet(tb_, write_parquet_path);
    assert(write_res == arrow::Status::OK());
}


void testReadParquet() {
    // 读取 Parquet
    char const* parquet_path = "./test_dst.parquet";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_parquet(parquet_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    assert(tb_.num_rows() == 2);
}

void testReadJson() {
    // 读取Json文件
    char const* json_path = "./test.json";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_json(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    assert(tb_.num_rows() == 2);
}

void testComputeGreater() {
    // 比较两列 int 值中 int1 > int2的值, greater函数
    char const* json_path = "./comp_gt.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto array_a = tb_.GetColumnByName("int1");
    auto array_b = tb_.GetColumnByName("int2");

    auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);
    auto array_b_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_b);

    auto compared_datum = arrow::compute::CallFunction("greater", { array_a_res, array_b_res });
    auto array_a_gt_b_compute = compared_datum->make_array();

    arrow::PrettyPrint(*array_a_gt_b_compute, {}, &std::cerr);

    auto schema =
        arrow::schema({ arrow::field("int1", arrow::int64()), arrow::field("int2", arrow::int64()),
                       arrow::field("a>b? (arrow)", arrow::boolean()) });

    std::shared_ptr<arrow::Table> my_table = arrow::Table::Make(
        schema, { array_a_res, array_b_res, array_a_gt_b_compute }, tb_.num_rows());

    arrow::PrettyPrint(*my_table, {}, &std::cerr);
}

void testComputeMinMax() {
    // 计算int1列的最大值和最小值
    char const* json_path = "./comp_gt.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto array_a = tb_.GetColumnByName("int1");
    auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);

    arrow::compute::ScalarAggregateOptions scalar_aggregate_options;
    scalar_aggregate_options.skip_nulls = false;

    auto min_max = arrow::compute::CallFunction("min_max", { array_a_res }, &scalar_aggregate_options);

    // Unpack struct scalar result (a two-field {"min", "max"} scalar)
    auto min_value = min_max->scalar_as<arrow::StructScalar>().value[0];
    auto max_value = min_max->scalar_as<arrow::StructScalar>().value[1];

    assert(min_value->ToString() == "1");
    assert(max_value->ToString() == "8");
}

#define GTEST_TEST(a, b) void a##_##b()
#define ASSERT_EQ(a, b) assert(a == b)

GTEST_TEST(RWTests, ComputeMean) {
    // 计算int1列的平均值
    char const* json_path = "../data/comp_gt.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto array_a = tb_.GetColumnByName("int1");
    auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);

    arrow::compute::ScalarAggregateOptions scalar_aggregate_options;
    scalar_aggregate_options.skip_nulls = false;

    auto mean = arrow::compute::CallFunction("mean", { array_a_res }, &scalar_aggregate_options);

    auto const& mean_value = mean->scalar_as<arrow::Scalar>();

    ASSERT_EQ(mean_value.ToString(), "4.5");
}

GTEST_TEST(RWTests, ComputeAdd) {
    // 将第一列的值加3
    char const* json_path = "../data/comp_gt.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto array_a = tb_.GetColumnByName("int1");
    auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);

    arrow::compute::ScalarAggregateOptions scalar_aggregate_options;
    scalar_aggregate_options.skip_nulls = false;

    std::shared_ptr<arrow::Scalar> increment = std::make_shared<arrow::Int64Scalar>(3);

    auto add = arrow::compute::CallFunction("add", { array_a_res, increment }, &scalar_aggregate_options);
    std::shared_ptr<arrow::Array> incremented_array = add->array_as<arrow::Array>();
    arrow::PrettyPrint(*incremented_array, {}, &std::cerr);
}


GTEST_TEST(RWTests, ComputeAddArray) {
    // int1和int2两列相加
    char const* json_path = "../data/comp_gt.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto array_a = tb_.GetColumnByName("int1");
    auto array_a_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_a);

    auto array_b = tb_.GetColumnByName("int2");
    auto array_b_res = ArrowUtil::chunked_array_to_array<int64_t, numbuildT<arrow::Int64Type>, arrow::Int64Array>(array_b);

    arrow::compute::ScalarAggregateOptions scalar_aggregate_options;
    scalar_aggregate_options.skip_nulls = false;

    auto add = arrow::compute::CallFunction("add", { array_a_res, array_b_res }, &scalar_aggregate_options);
    std::shared_ptr<arrow::Array> incremented_array = add->array_as<arrow::Array>();
    arrow::PrettyPrint(*incremented_array, {}, &std::cerr);
}

GTEST_TEST(RWTests, ComputeStringEqual) {
    // 比较s1和s2两列是否相等
    char const* json_path = "../data/comp_s_eq.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);

    auto array_a = tb_.GetColumnByName("s1");
    auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a);

    auto array_b = tb_.GetColumnByName("s2");
    auto array_b_res = ArrowUtil::chunked_array_to_str_array(array_b);

    arrow::compute::ScalarAggregateOptions scalar_aggregate_options;
    scalar_aggregate_options.skip_nulls = false;

    auto eq_ = arrow::compute::CallFunction("equal", { array_a_res, array_b_res }, &scalar_aggregate_options);
    std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>();
    arrow::PrettyPrint(*eq_array, {}, &std::cerr);
}

GTEST_TEST(RWTests, ComputeCustom) {
    // 自己写算法逐个比较相等 
    char const* json_path = "../data/comp_s_eq.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto arr1 = tb_.GetColumnByName("s1");
    auto arr2 = tb_.GetColumnByName("s2");
    auto v1 = ArrowUtil::chunked_array_to_str_vector(arr1);
    auto v2 = ArrowUtil::chunked_array_to_str_vector(arr2);
    for (std::size_t i = 0; i < v1.size(); ++i) {
        if (v1[i] != v2[i]) {
            std::cerr << v1[i] << "!=" << v2[i] << "\n";
        }
    }
}

GTEST_TEST(RWTests, ComputeCustomDbl) {
    // 自己写算法比较double值
    char const* json_path = "../data/custom_dbl.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto arr1 = tb_.GetColumnByName("dbl1");
    auto arr2 = tb_.GetColumnByName("dbl2");
    auto v1 = ArrowUtil::chunked_array_to_vector<double, arrow::DoubleArray>(arr1);
    auto v2 = ArrowUtil::chunked_array_to_vector<double, arrow::DoubleArray>(arr2);
    for (std::size_t i = 0; i < v1.size(); ++i) {
        if (v1[i] != v2[i]) {
            std::cerr << v1[i] << "!=" << v2[i] << "\n";
        }
    }
}

GTEST_TEST(RWTests, ComputeEqualDbl) {
    // 使用equal函数比较double值
    char const* json_path = "../data/custom_dbl.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);
    auto arr1 = tb_.GetColumnByName("dbl1");
    auto arr2 = tb_.GetColumnByName("dbl2");

    auto dbl_arr1 = ArrowUtil::chunked_array_to_array<double, numbuildT<arrow::DoubleType>, arrow::DoubleArray>(arr1);
    auto dbl_arr2 = ArrowUtil::chunked_array_to_array<double, numbuildT<arrow::DoubleType>, arrow::DoubleArray>(arr2);

    arrow::compute::ScalarAggregateOptions scalar_aggregate_options;
    scalar_aggregate_options.skip_nulls = false;

    auto eq_ = arrow::compute::CallFunction("equal", { dbl_arr1, dbl_arr2 }, &scalar_aggregate_options);
    std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>();
    arrow::PrettyPrint(*eq_array, {}, &std::cerr);
}

GTEST_TEST(RWTests, StrStartsWith) {
    // 计算s1列以是否以 Zha开头的值
    char const* json_path = "../data/comp_s_eq.csv";
    std::shared_ptr<arrow::Table> tb;
    ArrowUtil::read_csv(json_path, tb);
    auto const& tb_ = *tb;
    arrow::PrettyPrint(tb_, {}, &std::cerr);

    auto array_a = tb_.GetColumnByName("s1");
    auto array_a_res = ArrowUtil::chunked_array_to_str_array(array_a);

    arrow::compute::MatchSubstringOptions options("Zha");

    auto eq_ = arrow::compute::CallFunction("starts_with", { array_a_res }, &options);
    std::shared_ptr<arrow::Array> eq_array = eq_->array_as<arrow::Array>();
    arrow::PrettyPrint(*eq_array, {}, &std::cerr);
}



using arrow::Int32Builder;
using arrow::Int64Builder;
using arrow::DoubleBuilder;
using arrow::StringBuilder;

struct row_data {
    int32_t col1;
    int64_t col2;
    double col3;
    std::string col4;
};//行结构

#define EXIT_ON_FAILURE(expr)                      \
  do {                                             \
    arrow::Status status_ = (expr);                \
    if (!status_.ok()) {                           \
      std::cerr << status_.message() << std::endl; \
      return EXIT_FAILURE;                         \
    }                                              \
  } while (0);

arrow::Status CreateTable(const std::vector<struct row_data>& rows, std::shared_ptr<arrow::Table>* table) {

    //使用arrow::jemalloc::MemoryPool::default_pool()构建器更有效,因为这可以适当增加底层内存区域的大小.
    arrow::MemoryPool* pool = arrow::default_memory_pool();

    Int32Builder col1_builder(pool);
    Int64Builder col2_builder(pool);
    DoubleBuilder col3_builder(pool);
    StringBuilder col4_builder(pool);

    //现在我们可以循环我们现有的数据,并将其插入到构建器中。这里的' Append '调用可能会失败(例如,我们无法分配足够的额外内存)。因此我们需要检查它们的返回值。
    for (const row_data& row : rows) {
        ARROW_RETURN_NOT_OK(col1_builder.Append(row.col1));
        ARROW_RETURN_NOT_OK(col2_builder.Append(row.col2));
        ARROW_RETURN_NOT_OK(col3_builder.Append(row.col3));
        ARROW_RETURN_NOT_OK(col4_builder.Append(row.col4));
    }

    //添加空值,末尾值的元素为空
    ARROW_RETURN_NOT_OK(col1_builder.AppendNull());
    ARROW_RETURN_NOT_OK(col2_builder.AppendNull());
    ARROW_RETURN_NOT_OK(col3_builder.AppendNull());
    ARROW_RETURN_NOT_OK(col4_builder.AppendNull());

    std::shared_ptr<arrow::Array> col1_array;
    ARROW_RETURN_NOT_OK(col1_builder.Finish(&col1_array));
    std::shared_ptr<arrow::Array> col2_array;
    ARROW_RETURN_NOT_OK(col2_builder.Finish(&col2_array));
    std::shared_ptr<arrow::Array> col3_array;
    ARROW_RETURN_NOT_OK(col3_builder.Finish(&col3_array));
    std::shared_ptr<arrow::Array> col4_array;
    ARROW_RETURN_NOT_OK(col4_builder.Finish(&col4_array));

    std::vector<std::shared_ptr<arrow::Field>> schema_vector = {
            arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()),
            arrow::field("col4", arrow::utf8()) };

    auto schema = std::make_shared<arrow::Schema>(schema_vector);

    //最终的' table '变量是我们可以传递给其他可以使用Apache Arrow内存结构的函数的变量。这个对象拥有所有引用数据的所有权,
    //因此一旦我们离开构建表及其底层数组的函数的作用域,就不必关心未定义的引用。
    *table = arrow::Table::Make(schema, { col1_array, col2_array, col3_array,col4_array });

    return arrow::Status::OK();
}

arrow::Status TableToVector(const std::shared_ptr<arrow::Table>& table,
    std::vector<struct row_data>* rows) {
    //检查表结构是否一致
    std::vector<std::shared_ptr<arrow::Field>> schema_vector = {
            arrow::field("col1", arrow::int32()), arrow::field("col2", arrow::int64()), arrow::field("col3", arrow::float64()),
            arrow::field("col4", arrow::utf8()) };
    auto expected_schema = std::make_shared<arrow::Schema>(schema_vector);

    if (!expected_schema->Equals(*table->schema())) {
        // The table doesn't have the expected schema thus we cannot directly
        // convert it to our target representation.
        return arrow::Status::Invalid("Schemas are not matching!");
    }

    //获取对应列数据指针
    auto col1s =
        std::static_pointer_cast<arrow::Int32Array>(table->column(0)->chunk(0));
    auto col2s =
        std::static_pointer_cast<arrow::Int64Array>(table->column(1)->chunk(0));
    auto col3s =
        std::static_pointer_cast<arrow::DoubleArray>(table->column(2)->chunk(0));
    auto col4s =
        std::static_pointer_cast<arrow::StringArray>(table->column(3)->chunk(0));

    for (int64_t i = 0; i < table->num_rows(); i++) {
        if (col1s->IsNull(i)) {
            assert(i == 3);//第四行为null
        }
        else {
            int32_t col1 = col1s->Value(i);
            int64_t col2 = col2s->Value(i);
            double col3 = col3s->Value(i);
            std::string col4 = col4s->GetString(i);
            rows->push_back({ col1, col2, col3,col4 });
        }
    }

    return arrow::Status::OK();
}

// 行数组和列数组相互转换
int testTableConvertSTL() {
    //行数组
    std::vector<row_data> rows = {
            {1, 11,1.0, "John"}, {2, 22,2.0, "Tom"}, {3,33, 3.0,"Susan"} };

    std::shared_ptr<arrow::Table> table;
    EXIT_ON_FAILURE(CreateTable(rows, &table));

    std::vector<row_data> expected_rows;
    EXIT_ON_FAILURE(TableToVector(table, &expected_rows));
    std::cout << expected_rows.size() << std::endl;
    assert(rows.size() == expected_rows.size());
    return 0;
}

void test() {
    // 构建一个int8数组
    arrow::Int8Builder builder;
    arrow::Int16Builder int16builder;
    int8_t days_raw[5] = { 1, 12, 17, 23, 28 };
    int8_t months_raw[5] = { 1, 3, 5, 7, 1 };
    int16_t years_raw[5] = { 1990, 2000, 1995, 2000, 1995 };
    builder.AppendValues(days_raw, 5);
    std::shared_ptr<arrow::Array> days = builder.Finish().MoveValueUnsafe();    
    builder.AppendValues(months_raw, 5);
    std::shared_ptr<arrow::Array> months = builder.Finish().MoveValueUnsafe();    
    int16builder.AppendValues(years_raw, 5);
    std::shared_ptr<arrow::Array> years = int16builder.Finish().MoveValueUnsafe();

    // Schema 自定义table
    // Now, we want a RecordBatch, which has columns and labels for said columns.
    // This gets us to the 2d data structures we want in Arrow.
    // These are defined by schema, which have fields -- here we get both those object types
    // ready.
    std::shared_ptr<arrow::Field> field_day, field_month, field_year;
    std::shared_ptr<arrow::Schema> schema;

    // Every field needs its name and data type.
    field_day = arrow::field("Day", arrow::int8());
    field_month = arrow::field("Month", arrow::int8());
    field_year = arrow::field("Year", arrow::int16());

    // The schema can be built from a vector of fields, and we do so here.
    schema = arrow::schema({ field_day, field_month, field_year });

    // 打印
    // With the schema and Arrays full of data, we can make our RecordBatch! Here,
    // each column is internally contiguous. This is in opposition to Tables, which we'll
    // see next.
    std::shared_ptr<arrow::RecordBatch> rbatch;
    // The RecordBatch needs the schema, length for columns, which all must match,
    // and the actual data itself.
    rbatch = arrow::RecordBatch::Make(schema, days->length(), { days, months, years });
    std::cout << rbatch->ToString();
    /*
    Day:   [
        1,
        12,
        17,
        23,
        28
      ]
    Month:   [
        1,
        3,
        5,
        7,
        1
      ]
    Year:   [
        1990,
        2000,
        1995,
        2000,
        1995
      ]
    */

    // stl vector容器
    arrow::ArrayVector day_vecs{days};
    std::shared_ptr<arrow::ChunkedArray> day_chunks =
        std::make_shared<arrow::ChunkedArray>(day_vecs);
    testTableConvertSTL();

    testReadCSV();
    /*
    col1: string
    col2: string
    col3: string
    ----
    col1:
      [
        [
          "val1",
          "val1"
        ]
      ]
    col2:
      [
        [
          "val2",
          "val2"
        ]
      ]
    col3:
      [
        [
          "val3",
          "val3"
        ]
    ]
    */
    testWriteIpc();
    testReadIPC();
    //testComputeGreater();
    //testComputeMinMax();
}

Compute Functions — Apache Arrow v17.0.0

GitHub - apache/arrow: Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing


创作不易,小小的支持一下吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1949426.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【PyTorch】基于YOLO的多目标检测项目(一)

【PyTorch】基于YOLO的多目标检测项目&#xff08;一&#xff09; 【PyTorch】基于YOLO的多目标检测项目&#xff08;二&#xff09; 目标检测是对图像中的现有目标进行定位和分类的过程。识别的对象在图像中显示有边界框。一般的目标检测方法有两种&#xff1a;基于区域提议的…

javaEE-02-servlet

文章目录 Servlet 技术servlet程序示例通过实现Servlet接口实现Servlet程序通过继承 HttpServlet 实现 Servlet 程序 Servlet的声明周期 ServletConfig 类ServletContext 类HttpServletRequest 类请求的转发 HttpServletResponse 类请求重定向 HTTP 协议GET 请求Post请求常用请…

三维影像系统PACS源码,图像存储与传输系统,应用于医院中管理医疗设备如CT,MR等产生的医学图像的信息系统

PACS&#xff0c;即图像存储与传输系统&#xff0c;是应用于医院中管理医疗设备如CT&#xff0c;MR等产生的医学图像的信息系统。目标是支持在医院内部所有关于图像的活动&#xff0c;集成了医疗设备&#xff0c;图像存储和分发&#xff0c;数字图像在重要诊断和会诊时的显示&a…

unity ui toolkit的使用

UIToolkitExamples (github)样例 GitHub - ikewada/UIToolkitExamples: チュートリアル動画「使ってみようUI Toolkit」のためのサンプルプロジェクトです官网 Unity - Manual: UI Toolkit视频教程 使用 UI Toolkit - 上集_哔哩哔哩_bilibili 使用 UI Toolkit - 下集_哔哩哔哩_…

vue3前端开发-小兔鲜项目-使用pinia插件完成token的本地存储

vue3前端开发-小兔鲜项目-使用pinia插件完成token的本地存储&#xff01;实际业务开发中&#xff0c;token是一个表示着用户登录状态的重要信息&#xff0c;它有自己的生命周期。因此&#xff0c;这个参数值必须实例化存储在本地中。不能跟着pinia。因为pinia是基于内存设计的模…

go语言day18 reflect反射

Golang-100-Days/Day16-20(Go语言基础进阶)/day19_Go语言反射.md at master rubyhan1314/Golang-100-Days (github.com) 一、interface接口 接口类型内部存储了一对pair(value,Type) type interface { type *Type // 类型信息 data unsafe.Pointer // 指向具体数据 } 1)创建R…

Git基本原理讲解、常见命令、Git版本回退、Git抛弃本地分支拉取仓库最新分支

借此机会写篇博客汇总一下自己去公司实习之后遇到的一些常见关于Git的操作。 Git基本认识 Git把数据看作是对小型文件系统的一组快照&#xff0c;每次提交更新&#xff0c;或在Git中保存项目状态时&#xff0c;Git主要对当时的全部文件制作一个快照并保存这个快照的索引。同时…

嵌入式C++、MQTT、数据库、Grafana、机器学习( Scikit-learn):智能建筑大数据管理平台(代码示例)

项目概述 智能建筑管理系统&#xff08;Intelligent Building Management System, IBMS&#xff09;是一个集成多种技术的复杂系统&#xff0c;旨在通过智能化手段提升建筑的管理效率、节能效果和居住舒适度。该系统涉及嵌入式系统、物联网&#xff08;IoT&#xff09;、大数据…

数据库-触发器,存储过程

按照题目要求完成下列题目&#xff1a; 1.触发器 mysql> use mydb16_trigger; Database changed mysql> create table goods(-> gid char(8) primary key,-> name varchar(10),-> price decimal(8,2),-> num int); Query OK, 0 rows affected (0.01 sec)my…

01 Redis引入和概述

Redis引入和概述 一、Redis的历史和发展过程 ​ Redis是在2008年由意大利的一家创业公司Merzia的创始人Salvatore Sanfilippo(萨尔瓦托.圣菲利波)创造的。 ​ 当时&#xff0c;Salvatore 正在开发一款基于MySQL的网站实时统计系统LLOOGG&#xff0c;然而他发现MySQL的性能并…

VAE、GAN与Transformer核心公式解析

VAE、GAN与Transformer核心公式解析 VAE、GAN与Transformer&#xff1a;三大深度学习模型的异同解析 【表格】VAE、GAN与Transformer的对比分析 序号对比维度VAE&#xff08;变分自编码器&#xff09;GAN&#xff08;生成对抗网络&#xff09;Transformer&#xff08;变换器&…

计算机网络(四)数字签名和CA认证

什么是数字签名和CA认证&#xff1f; 数字签名 数字签名的过程通常涉及以下几个步骤&#xff1a; 信息哈希&#xff1a;首先&#xff0c;发送方使用一个哈希函数&#xff08;如SHA-256&#xff09;对要发送的信息&#xff08;如电子邮件、文件等&#xff09;生成一个固定长度…

GIS场景升级:支持多种影像协议与天气效果

在GIS场景编辑领域&#xff0c;升级视效的需求日益增加。有一款名为山海鲸可视化的免费工具&#xff0c;本人亲测能够完美满足这一需求。山海鲸可视化不仅支持多种GIS影像协议&#xff08;如TMS、WMS、WMTS等&#xff09;&#xff0c;还能一键添加天气效果&#xff0c;瞬间提升…

【Unity】 HTFramework框架(五十三)使用 Addressables 可寻址系统

更新日期&#xff1a;2024年7月25日。 Github源码&#xff1a;[点我获取源码] Gitee源码&#xff1a;[点我获取源码] 索引 Addressables 可寻址系统使用 Addressables 可寻址系统一、导入 Addressables二、切换到 Addressables 加载模式三、切换资源加载助手四、加载资源五、注…

【全面介绍Python多线程】

🎥博主:程序员不想YY啊 💫CSDN优质创作者,CSDN实力新星,CSDN博客专家 🤗点赞🎈收藏⭐再看💫养成习惯 ✨希望本文对您有所裨益,如有不足之处,欢迎在评论区提出指正,让我们共同学习、交流进步! 🦇目录 1. 🦇前言2. 🦇threading 模块的基本用法3. 🦇Thre…

编程类精品GPTs

文章目录 编程类精品GPTs前言种类ChatGPT - GrimoireProfessional-coder-auto-programming 总结 编程类精品GPTs 前言 代码类的AI, 主要看以下要点: 面对含糊不清的需求是否能引导出完整的需求面对完整的需求是否能分步编写代码完成需求编写的代码是否具有可读性和可扩展性 …

【个人亲试最新】WSL2中的Ubuntu 22.04安装Docker

文章目录 Wsl2中的Ubuntu22.04安装Docker其他问题wsl中执行Ubuntu 报错&#xff1a;System has not been booted with systemd as init system (PID 1). Can‘t operate. 参考博客 &#x1f60a;点此到文末惊喜↩︎ Wsl2中的Ubuntu22.04安装Docker 确定为wsl2ubuntu22.04&#…

57 数据链路层

用于两个设备&#xff08;同一种数据链路节点&#xff09;之间传递 目录 对比理解“数据链路层” 和 “网络层”以太网 2.1 认识以太网 2.2 以太网帧格式MAC地址 3.1 认识MAC地址 3.2 对比理解MAC地址和IP地址局域网通信MTU 5.1 认识MTU 5.2 MTU对ip协议的影响 5.3 MTU对UDP的…

vue elementui 在table里使用el-switch

<el-table-columnprop"operationStatus"label"状态"header-align"center"align"center"><template slot-scope"scope"><el-switch active-value"ENABLE" inactive-value"DISABLE" v-mod…

Java OpenCV 图像处理40 图形图像 图片裁切ROI

Java OpenCV 图像处理40 图形图像 图片裁切 在 OpenCV 中&#xff0c;Rect 类是用来表示矩形的数据结构&#xff0c;通常用于定义图像处理中的感兴趣区域&#xff08;Region of Interest&#xff0c;ROI&#xff09;&#xff0c;或者指定图像中的某个区域的位置和大小。Rect 类…