lance_encoding

Module decoder

Source
Expand description

Utilities and traits for scheduling & decoding data

Reading data involves two steps: scheduling and decoding. The scheduling step is responsible for figuring out what data is needed and issuing the appropriate I/O requests. The decoding step is responsible for taking the loaded data and turning it into Arrow arrays.

§Scheduling

Scheduling is split into self::FieldScheduler and self::PageScheduler. There is one field scheduler for each output field, which may map to many columns of actual data. A field scheduler is responsible for figuring out the order in which pages should be scheduled. Field schedulers then delegate to page schedulers to figure out the I/O requests that need to be made for the page.

Page schedulers also create the decoders that will be used to decode the scheduled data.

§Decoding

Decoders are split into [self::PhysicalPageDecoder] and self::LogicalPageDecoder. Note that both physical and logical decoding happens on a per-page basis. There is no concept of a “field decoder” or “column decoder”.

The physical decoders handle lower level encodings. They have a few advantages:

  • They do not need to decode into an Arrow array and so they don’t need to be enveloped into the Arrow filesystem (e.g. Arrow doesn’t have a bit-packed type. We can use variable-length binary but that is kind of awkward)
  • They can decode into an existing allocation. This can allow for “page bridging”. If we are trying to decode into a batch of 1024 rows and the rows 0..1024 are spread across two pages then we can avoid a memory copy by allocating once and decoding each page into the outer allocation. (note: page bridging is not actually implemented yet)

However, there are some limitations for physical decoders:

  • They are constrained to a single column
  • The API is more complex

The logical decoders are designed to map one or more columns of Lance data into an Arrow array.

Typically, a “logical encoding” will have both a logical decoder and a field scheduler. Meanwhile, a “physical encoding” will have a physical decoder but no corresponding field scheduler.git add –all

§General notes

Encodings are typically nested into each other to form a tree. The top of the tree is the user requested schema. Each field in that schema is assigned to one top-level logical encoding. That encoding can then contain other logical encodings or physical encodings. Physical encodings can also contain other physical encodings.

So, for example, a single field in the Arrow schema might have the type List

The encoding tree could then be:

root: List (logical encoding)

  • indices: Primitive (logical encoding)
    • column: Basic (physical encoding)
      • validity: Bitmap (physical encoding)
      • values: RLE (physical encoding)
        • runs: Value (physical encoding)
        • values: Value (physical encoding)
  • items: Primitive (logical encoding)
    • column: Basic (physical encoding)
      • values: Value (physical encoding)

Note that, in this example, root.items.column does not have a validity because there were no nulls in the page.

§Multiple buffers or multiple columns?

Note that there are many different ways we can write encodings. For example, we might store primitive fields in a single column with two buffers (one for validity and one for values)

On the other hand, we could also store a primitive field as two different columns. One that yields a non-nullable boolean array and one that yields a non-nullable array of items. Then we could combine these two arrays into a single array where the boolean array is the bitmap. There are a few subtle differences between the approaches:

  • Storing things as multiple buffers within the same column is generally more efficient and easier to schedule. For example, in-batch coalescing is very easy but can only be done on data that is in the same page.
  • When things are stored in multiple columns you have to worry about their pages not being in sync. In our previous validity / values example this means we might have to do some memory copies to get the validity array and values arrays to be the same length as decode.
  • When things are stored in a single column, projection is impossible. For example, if we tried to store all the struct fields in a single column with lots of buffers then we wouldn’t be able to read back individual fields of the struct.

The fixed size list decoding is an interesting example because it is actually both a physical encoding and a logical encoding. A fixed size list of a physical encoding is, itself, a physical encoding (e.g. a fixed size list of doubles). However, a fixed size list of a logical encoding is a logical encoding (e.g. a fixed size list of structs).

§The scheduling loop

Reading a Lance file involves both scheduling and decoding. Its generally expected that these will run as two separate threads.


                                   I/O PARALLELISM
                      Issues
                      Requests   ┌─────────────────┐
                                 │                 │        Wait for
                      ┌──────────►   I/O Service   ├─────►  Enough I/O ◄─┐
                      │          │                 │        For batch    │
                      │          └─────────────────┘             │3      │
                      │                                          │       │
                      │                                          │       │2
┌─────────────────────┴─┐                              ┌─────────▼───────┴┐
│                       │                              │                  │Poll
│       Batch Decode    │ Decode tasks sent via channel│   Batch Decode   │1
│       Scheduler       ├─────────────────────────────►│   Stream         ◄─────
│                       │                              │                  │
└─────▲─────────────┬───┘                              └─────────┬────────┘
      │             │                                            │4
      │             │                                            │
      └─────────────┘                                   ┌────────┴────────┐
 Caller of schedule_range                Buffer polling │                 │
 will be scheduler thread                to achieve CPU │ Decode Batch    ├────►
 and schedule one decode                 parallelism    │ Task            │
 task (and all needed I/O)               (thread per    │                 │
 per logical page                         batch)        └─────────────────┘

The scheduling thread will work through the file from the start to the end as quickly as possible. Data is scheduled one page at a time in a row-major fashion. For example, imagine we have a file with the following page structure:

Score (Float32)     | C0P0 |
Id (16-byte UUID)   | C1P0 | C1P1 | C1P2 | C1P3 |
Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |

This would be quite common as each of these pages has the same number of bytes. Let’s pretend each page is 1MiB and so there are 256Ki rows of data. Each page of Score has 256Ki rows. Each page of Id has 64Ki rows. Each page of Vector has 256 rows. The scheduler would then schedule in the following order:

C0 P0 C1 P0 C2 P0 C2 P1 … (254 pages omitted) C2 P255 C1 P1 C2 P256 … (254 pages omitted) C2 P511 C1 P2 C2 P512 … (254 pages omitted) C2 P767 C1 P3 C2 P768 … (254 pages omitted) C2 P1024

This is the ideal scheduling order because it means we can decode complete rows as quickly as possible. Note that the scheduler thread does not need to wait for I/O to happen at any point. As soon as it starts it will start scheduling one page of I/O after another until it has scheduled the entire file’s worth of I/O. This is slightly different than other file readers which have “row group parallelism” and will typically only schedule X row groups worth of reads at a time.

In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute falls behind.

§Indirect I/O

Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded the file. This happens when we have variable sized list data. In that case the scheduling task for that page will only schedule the first part of the read (loading the list offsets). It will then immediately spawn a new tokio task to wait for that I/O and decode the list offsets. That follow-up task is not part of the scheduling loop or the decode loop. It is a free task. Once the list offsets are decoded we submit a follow-up I/O task. This task is scheduled at a high priority because the decoder is going to need it soon.

§The decode loop

As soon as the scheduler starts we can start decoding. Each time we schedule a page we push a decoder for that page’s data into a channel. The decode loop (BatchDecodeStream) reads from that channel. Each time it receives a decoder it waits until the decoder has all of its data. Then it grabs the next decoder. Once it has enough loaded decoders to complete a batch worth of rows it will spawn a “decode batch task”.

These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow arrays. This may involve signifciant CPU processing like decompression or arithmetic in order to restore the data to its correct in-memory representation.

§Batch size

The BatchDecodeStream is configured with a batch size. This does not need to have any relation to the page size(s) used to write the data. This keeps our compute work completely independent of our I/O work. We suggest using small batch sizes:

  • Batches should fit in CPU cache (at least L3)
  • More batches means more opportunity for parallelism
  • The “batch overhead” is very small in Lance compared to other formats because it has no relation to the way the data is stored.

Structs§

Enums§

Traits§

Functions§