Expand description
Utilities and traits for scheduling & decoding data
Reading data involves two steps: scheduling and decoding. The scheduling step is responsible for figuring out what data is needed and issuing the appropriate I/O requests. The decoding step is responsible for taking the loaded data and turning it into Arrow arrays.
§Scheduling
Scheduling is split into self::FieldScheduler
and self::PageScheduler
.
There is one field scheduler for each output field, which may map to many
columns of actual data. A field scheduler is responsible for figuring out
the order in which pages should be scheduled. Field schedulers then delegate
to page schedulers to figure out the I/O requests that need to be made for
the page.
Page schedulers also create the decoders that will be used to decode the scheduled data.
§Decoding
Decoders are split into [self::PhysicalPageDecoder
] and
self::LogicalPageDecoder
. Note that both physical and logical decoding
happens on a per-page basis. There is no concept of a “field decoder” or
“column decoder”.
The physical decoders handle lower level encodings. They have a few advantages:
- They do not need to decode into an Arrow array and so they don’t need to be enveloped into the Arrow filesystem (e.g. Arrow doesn’t have a bit-packed type. We can use variable-length binary but that is kind of awkward)
- They can decode into an existing allocation. This can allow for “page bridging”. If we are trying to decode into a batch of 1024 rows and the rows 0..1024 are spread across two pages then we can avoid a memory copy by allocating once and decoding each page into the outer allocation. (note: page bridging is not actually implemented yet)
However, there are some limitations for physical decoders:
- They are constrained to a single column
- The API is more complex
The logical decoders are designed to map one or more columns of Lance data into an Arrow array.
Typically, a “logical encoding” will have both a logical decoder and a field scheduler. Meanwhile, a “physical encoding” will have a physical decoder but no corresponding field scheduler.git add –all
§General notes
Encodings are typically nested into each other to form a tree. The top of the tree is the user requested schema. Each field in that schema is assigned to one top-level logical encoding. That encoding can then contain other logical encodings or physical encodings. Physical encodings can also contain other physical encodings.
So, for example, a single field in the Arrow schema might have the type List
The encoding tree could then be:
root: List (logical encoding)
- indices: Primitive (logical encoding)
- column: Basic (physical encoding)
- validity: Bitmap (physical encoding)
- values: RLE (physical encoding)
- runs: Value (physical encoding)
- values: Value (physical encoding)
- column: Basic (physical encoding)
- items: Primitive (logical encoding)
- column: Basic (physical encoding)
- values: Value (physical encoding)
- column: Basic (physical encoding)
Note that, in this example, root.items.column does not have a validity because there were no nulls in the page.
§Multiple buffers or multiple columns?
Note that there are many different ways we can write encodings. For example, we might store primitive fields in a single column with two buffers (one for validity and one for values)
On the other hand, we could also store a primitive field as two different columns. One that yields a non-nullable boolean array and one that yields a non-nullable array of items. Then we could combine these two arrays into a single array where the boolean array is the bitmap. There are a few subtle differences between the approaches:
- Storing things as multiple buffers within the same column is generally more efficient and easier to schedule. For example, in-batch coalescing is very easy but can only be done on data that is in the same page.
- When things are stored in multiple columns you have to worry about their pages not being in sync. In our previous validity / values example this means we might have to do some memory copies to get the validity array and values arrays to be the same length as decode.
- When things are stored in a single column, projection is impossible. For example, if we tried to store all the struct fields in a single column with lots of buffers then we wouldn’t be able to read back individual fields of the struct.
The fixed size list decoding is an interesting example because it is actually both a physical encoding and a logical encoding. A fixed size list of a physical encoding is, itself, a physical encoding (e.g. a fixed size list of doubles). However, a fixed size list of a logical encoding is a logical encoding (e.g. a fixed size list of structs).
§The scheduling loop
Reading a Lance file involves both scheduling and decoding. Its generally expected that these will run as two separate threads.
I/O PARALLELISM
Issues
Requests ┌─────────────────┐
│ │ Wait for
┌──────────► I/O Service ├─────► Enough I/O ◄─┐
│ │ │ For batch │
│ └─────────────────┘ │3 │
│ │ │
│ │ │2
┌─────────────────────┴─┐ ┌─────────▼───────┴┐
│ │ │ │Poll
│ Batch Decode │ Decode tasks sent via channel│ Batch Decode │1
│ Scheduler ├─────────────────────────────►│ Stream ◄─────
│ │ │ │
└─────▲─────────────┬───┘ └─────────┬────────┘
│ │ │4
│ │ │
└─────────────┘ ┌────────┴────────┐
Caller of schedule_range Buffer polling │ │
will be scheduler thread to achieve CPU │ Decode Batch ├────►
and schedule one decode parallelism │ Task │
task (and all needed I/O) (thread per │ │
per logical page batch) └─────────────────┘
The scheduling thread will work through the file from the start to the end as quickly as possible. Data is scheduled one page at a time in a row-major fashion. For example, imagine we have a file with the following page structure:
Score (Float32) | C0P0 |
Id (16-byte UUID) | C1P0 | C1P1 | C1P2 | C1P3 |
Vector (4096 bytes) | C2P0 | C2P1 | C2P2 | C2P3 | .. | C2P1024 |
This would be quite common as each of these pages has the same number of bytes. Let’s pretend
each page is 1MiB and so there are 256Ki rows of data. Each page of Score
has 256Ki rows.
Each page of Id
has 64Ki rows. Each page of Vector
has 256 rows. The scheduler would then
schedule in the following order:
C0 P0 C1 P0 C2 P0 C2 P1 … (254 pages omitted) C2 P255 C1 P1 C2 P256 … (254 pages omitted) C2 P511 C1 P2 C2 P512 … (254 pages omitted) C2 P767 C1 P3 C2 P768 … (254 pages omitted) C2 P1024
This is the ideal scheduling order because it means we can decode complete rows as quickly as possible. Note that the scheduler thread does not need to wait for I/O to happen at any point. As soon as it starts it will start scheduling one page of I/O after another until it has scheduled the entire file’s worth of I/O. This is slightly different than other file readers which have “row group parallelism” and will typically only schedule X row groups worth of reads at a time.
In the near future there will be a backpressure mechanism and so it may need to stop/pause if the compute falls behind.
§Indirect I/O
Regrettably, there are times where we cannot know exactly what data we need until we have partially decoded the file. This happens when we have variable sized list data. In that case the scheduling task for that page will only schedule the first part of the read (loading the list offsets). It will then immediately spawn a new tokio task to wait for that I/O and decode the list offsets. That follow-up task is not part of the scheduling loop or the decode loop. It is a free task. Once the list offsets are decoded we submit a follow-up I/O task. This task is scheduled at a high priority because the decoder is going to need it soon.
§The decode loop
As soon as the scheduler starts we can start decoding. Each time we schedule a page we
push a decoder for that page’s data into a channel. The decode loop
(BatchDecodeStream
) reads from that channel. Each time it receives a decoder it
waits until the decoder has all of its data. Then it grabs the next decoder. Once it has
enough loaded decoders to complete a batch worth of rows it will spawn a “decode batch task”.
These batch decode tasks perform the actual CPU work of decoding the loaded data into Arrow arrays. This may involve signifciant CPU processing like decompression or arithmetic in order to restore the data to its correct in-memory representation.
§Batch size
The BatchDecodeStream
is configured with a batch size. This does not need to have any
relation to the page size(s) used to write the data. This keeps our compute work completely
independent of our I/O work. We suggest using small batch sizes:
- Batches should fit in CPU cache (at least L3)
- More batches means more opportunity for parallelism
- The “batch overhead” is very small in Lance compared to other formats because it has no relation to the way the data is stored.
Structs§
- A stream that takes scheduled jobs and generates decode tasks from them.
- Metadata describing a column in a file
- The core decoder strategy handles all the various Arrow types
- The scheduler for decoding batches
- A filter expression to apply to the data
- Determining the priority of a list request is tricky. We want the priority to be the top-level row. So if we have a list<list
> and each outer list has 10 rows and each inner list has 5 rows then the priority of the 100th item is 1 because it is the 5th item in the 10th item of the second row. - A task to decode data into an Arrow array
- Metadata describing a page in a file
- Contains the context for a scheduler
- A simple priority scheme for top-level fields with no parent repetition
- A stream that takes scheduled jobs and generates decode tasks from them.
Enums§
- Top-level encoding message for a page. Wraps both the legacy pb::ArrayEncoding and the newer pb::PageLayout
Traits§
- A trait for tasks that decode data into an Arrow array
- A scheduler for a field’s worth of data
- A decoder for a field’s worth of data
- A scheduler for single-column encodings of primitive data
- A decoder for single-column encodings of primitive data (this includes fixed size lists of primitive data)
- A trait to control the priority of I/O
Functions§
- Decodes a batch of data from an in-memory structure created by
crate::encoder::encode_batch
- Launches a scheduler on a dedicated (spawned) task and creates a decoder to decode the scheduled data and returns the decoder as a stream of record batches.