Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from protobuf to arrow #2013

Closed
felixwang9817 opened this issue Nov 9, 2021 · 29 comments
Closed

Switch from protobuf to arrow #2013

felixwang9817 opened this issue Nov 9, 2021 · 29 comments
Labels
kind/bug priority/p1 wontfix This will not be worked on

Comments

@felixwang9817
Copy link
Collaborator

felixwang9817 commented Nov 9, 2021

Is your feature request related to a problem? Please describe.

Serialization costs for protobuf are very high.

Describe the solution you'd like

Switching to arrow would decrease serialization costs by a lot. This issue tracks an investigation into feasibility of switching to arrow from protobuf.

Describe alternatives you've considered

Additional context

See this document for the results of a detailed investigation into latency issues due to on-demand feature views, which prompted the observation that serialization costs for protobuf are extremely high.

@judahrand
Copy link
Member

judahrand commented Nov 12, 2021

This has other advantages too, I believe, such as the ability to support certain data types much better and do away with Pandas dtypes. For example the ValueType.UNIX_TIMESTAMP type is currently pretty limiting.

It's been my experience that the weakest part of Feast is how typing is handled. I feel that the switch to Arrow along with better typing should be a top priority. It will be a fairly big change (possibly effecting the user facing ValueTypeinterface) as so feels like the sooner the better - break things when there are fewer users!

I'm happy to help with this if/when there is a plan!

@noomanee
Copy link

I just want to share my experience with Feast. I used Redis as Online store and the process of convert_arrow_to_proto is quite a bottleneck. To write 5 million rows and 30 columns, it took me more than an hour and its due to converting arrow to proto. One thing i noticed is that this function seems to use single core. Correct me if i am wrong

@achals
Copy link
Member

achals commented Nov 12, 2021

One thing to call out is that arrow IPC encoded values are generally larger than proto encoded values, for single rows of data. So this would mean that we would be increasing storage required in the onlinestore.

I think this is generally a fair choice to make to optimize for time to materialize over storage.

@felixwang9817
Copy link
Collaborator Author

@noomanee your results line up with the benchmarks that we've run on convert_arrow_to_proto. In our experience, converting a single feature takes ~1e-5 s, so converting 5 million rows * 30 columns should take ~1 hour. You're also correct that convert_arrow_to_proto is single core.

@noomanee
Copy link

Writing to Redis is quite fast in my situation, i am not sure about others. For sure, i think somehow we need to improve the convert_arrow_to_proto function if we want to keep the benefit of proto encoded value. Do you have any idea how can we improve this function?

@judahrand
Copy link
Member

judahrand commented Nov 15, 2021

One thing to call out is that arrow IPC encoded values are generally larger than proto encoded values, for single rows of data. So this would mean that we would be increasing storage required in the onlinestore.

I assume this is due to Arrow IPC encoding every message with its schema? So for single rows you have to decode schema + row?

I'm also not really convinced that switching to Arrow for serialization into the online store would be any quicker than convert_arrow_to_proto as you'll still have to serialize every individual cell of the table separately won't you? And according to a very quick benchmark (below) this takes about 1e-5 seconds per feature which is the same as convert_arrow_to_proto? I suspect deserialization and converting back to Arrow will be quicker though?

Unless you serialize entire rows but then you're going to have to retrieve entire rows at serve time? This would make the serialization 1-2 orders of magnitude faster but I'm not sure how this would effect online serving latency and memory consumption? From browsing the docs it seems like this is how the Datastore data model currently works but not how the SQLite or Redis models work?

Or am I entirely missing the intended use?

from typing import Dict, Generator, Iterable
import pyarrow as pa
import numpy as np


num_rows, num_cols = 200, 50
num_features = num_rows * num_cols
data = np.random.rand(num_cols, num_rows)
table = pa.Table.from_arrays(data, names=[str(_) for _ in range(num_cols)])


def serialize_features(table: pa.Table) -> Generator[Dict[str, bytes], None, None]:
    for row in table.to_batches(1):
        row =  pa.Table.from_batches([row])
        row_dict = {}
        for field in table.schema:
            feature = row.select([field.name])
            sink = pa.BufferOutputStream()
            with pa.ipc.new_file(sink, feature.schema) as writer:
                writer.write_table(feature)
            row_dict[field.name] = sink.getvalue().to_pybytes()
        yield row_dict


def deserialize_features(rows: Iterable[Dict[str, bytes]]) -> pa.Table:
    arrays = {}
    schema = {}
    for row in rows:
        for name, column in row.items():
            arrays[name] = arrays.get(name) or []
            with pa.ipc.open_file(column) as f:
                arrays[name] += [f.get_batch(i).column(name) for i in range(f.num_record_batches)]
                schema[name] = schema.get(name) or f.get_batch(0).schema

    arrays = {name: pa.chunked_array(data) for name, data in arrays.items()}
    return pa.Table.from_arrays([arrays[name] for name in schema], schema=pa.unify_schemas(list(schema.values())))

%timeit [row for row in serialize_features(table)]
r = [row for row in serialize_features(table)]
%timeit deserialize_features(r)


def serialize_rows(table: pa.Table) -> Generator[bytes, None, None]:
    schema = table.schema
    for row in table.to_batches(1):
        sink = pa.BufferOutputStream()
        with pa.ipc.new_file(sink, schema) as writer:
            writer.write_table(pa.Table.from_batches([row]))
        yield sink.getvalue().to_pybytes()


def deserialize_rows(rows: Iterable[bytes]) -> pa.Table:
    record_batches = []
    for file in rows:
        with pa.ipc.open_file(file) as f:
            record_batches += [f.get_batch(i) for i in range(f.num_record_batches)]
    return pa.Table.from_batches(record_batches)


%timeit [row for row in serialize_rows(table)]
r = [row for row in serialize_rows(table)]
%timeit deserialize_rows(r)
173 ms ± 2.65 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
157 ms ± 601 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
18.7 ms ± 146 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
19.4 ms ± 70.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

@judahrand
Copy link
Member

I suppose additionally this also raises the question - what's more important? Low minimum latency or low maximum latency?

Serializing by cell will achieve the former, serialising by row will achieve the latter.

@judahrand
Copy link
Member

judahrand commented Nov 15, 2021

From a little bit of experimentation this afternoon this seems to be the best compromise if using Arrow:

from typing import Dict, Generator, Iterable, List
import pyarrow as pa
import numpy as np


num_rows, num_cols = 200, 50
num_features = num_rows * num_cols
data = np.random.rand(num_cols, num_rows)
table = pa.Table.from_arrays(data, names=[str(_) for _ in range(num_cols)])

def serialize_rows(table: pa.Table) -> List[bytes]:
    return [row.serialize().to_pybytes() for row in table.to_batches(1)]

def deserialize_rows(rows: List[bytes], schema: pa.Schema) -> pa.Table:
    return pa.ipc.open_stream(b"".join([schema.serialize().to_pybytes()] + rows)).read_all()

%timeit serialize_rows(table)
rows = serialize_rows(table)
%timeit deserialize_rows(rows, table.schema)
11.1 ms ± 331 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
6.71 ms ± 182 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

The FeatureView Arrow schema could be stored in the Feast Registry and then just the raw serialized Arrow data for an entire FeatureView can be written to an EntityKey in the online store. This means that the whole FeatureView would have to be deserialized regardless of how many features are requested. This may be acceptable, however, given that the speed up per feature cell compared to Protobuf looks to be around 20x (4ms vs 80ms for 197 rows, 30 columns).

Again, there are other advantages of using Arrow for serialization which include the removal of the need to maintain type conversions between Arrow <-> Pandas, Pandas <-> Python, Arrow <-> Python, Pandas <-> Proto, Arrow <-> Proto, and Python and Proto. The only type mappings needed would be OfflineStore -> Arrow.

@woop
Copy link
Member

woop commented Nov 15, 2021

One thing to call out is that arrow IPC encoded values are generally larger than proto encoded values, for single rows of data. So this would mean that we would be increasing storage required in the onlinestore.

I think this is generally a fair choice to make to optimize for time to materialize over storage.

It's also worth noting that storage is something that can be optimized independently if we go with Arrow. We can relatively easily add compression (which would be off by default) that users can enable if they want to trade off latency/throughput for storage.

@woop
Copy link
Member

woop commented Nov 15, 2021

Unless you serialize entire rows but then you're going to have to retrieve entire rows at serve time? This would make the serialization 1-2 orders of magnitude faster but I'm not sure how this would effect online serving latency and memory consumption? From browsing the docs it seems like this is how the Datastore data model currently works but not how the SQLite or Redis models work?

I think the default approach to implementing stores should be K/V at the feature view (row) level. Generally we've found this to be more performant for online stores where accessing individual features requires individual lookups. I think Sqlite is implemented on a per feature basis, so it will suffer from performance issues at scale.

That being said, I can also see a world eventually where online stores can have further functionality. For example we may allow users to optimize how data is stored/clustered/partitioned/indexed. So the storage model can deviate at the individual store level, but the way in which Feast discovers what functionality a store provides should be consistent (ideally through some contract).

@woop
Copy link
Member

woop commented Nov 15, 2021

I suppose additionally this also raises the question - what's more important? Low minimum latency or low maximum latency?

Serializing by cell will achieve the former, serialising by row will achieve the latter.

Almost certainly low maximum latency

@judahrand
Copy link
Member

Unless you serialize entire rows but then you're going to have to retrieve entire rows at serve time? This would make the serialization 1-2 orders of magnitude faster but I'm not sure how this would effect online serving latency and memory consumption? From browsing the docs it seems like this is how the Datastore data model currently works but not how the SQLite or Redis models work?

I think the default approach to implementing stores should be K/V at the feature view (row) level. Generally we've found this to be more performant for online stores where accessing individual features requires individual lookups. I think Sqlite is implemented on a per feature basis, so it will suffer from performance issues at scale.

That being said, I can also see a world eventually where online stores can have further functionality. For example we may allow users to optimize how data is stored/clustered/partitioned/indexed. So the storage model can deviate at the individual store level, but the way in which Feast discovers what functionality a store provides should be consistent (ideally through some contract).

In this case Arrow looks like it'll absolutely help the serialization + conversion latency (maybe also worth considering LZ4 compression?).

I think there may even be enough here to start work on an RFC + PR to remove the Value and ValueType protobufs. And replace them with Arrow RecordBatches and Schemas and which can be merged into a FeatureView level schema with the unify_schemas function in pyArrow.

@woop
Copy link
Member

woop commented Nov 15, 2021

Unless you serialize entire rows but then you're going to have to retrieve entire rows at serve time? This would make the serialization 1-2 orders of magnitude faster but I'm not sure how this would effect online serving latency and memory consumption? From browsing the docs it seems like this is how the Datastore data model currently works but not how the SQLite or Redis models work?

I think the default approach to implementing stores should be K/V at the feature view (row) level. Generally we've found this to be more performant for online stores where accessing individual features requires individual lookups. I think Sqlite is implemented on a per feature basis, so it will suffer from performance issues at scale.
That being said, I can also see a world eventually where online stores can have further functionality. For example we may allow users to optimize how data is stored/clustered/partitioned/indexed. So the storage model can deviate at the individual store level, but the way in which Feast discovers what functionality a store provides should be consistent (ideally through some contract).

In this case Arrow looks like it'll absolutely help the serialization + conversion latency (maybe also worth considering LZ4 compression?).

Agree, but it's not so much about improving latency (which would be a nice benefit). The primary benefit to me is that we need to maintain less code and make fewer decisions on how a particular data store types map to/from our types.

I think there may even be enough here to start work on an RFC + PR to remove the Value and ValueType protobufs. And replace them with Arrow RecordBatches and Schemas and which can be merged into a FeatureView level schema with the unify_schemas function in pyArrow.

Agree. I think we need an RFC. This is a big change so I'd like folks to raise any concerns they have. We also need to talk through how exactly we will provide a migration path for users.

@pyalex
Copy link
Collaborator

pyalex commented Nov 16, 2021

Although benefits of having easier conversion between pandas <-> feast (with arrow) are undoubtful, using arrow as row serializer for Online Store feels very unnatural. In code example provided by @judahrand each separate serialized row has size 2896 bytes whereas useful only 400 bytes. So echoing what @achals said it's 7x overhead and it cannot be simply solved by compression on database side. It will also increase traffic between database and Feature Server and if we will introduce compression on Feature Server side, we'll simply loose all time that we won with proto->arrow replacement.

So if it's only for Pandas, maybe we should rather get rid of Pandas in latency sensitive parts, like transformation service? I believe this is what started this discussion?

Also, my 2 cents about convert_arrow_to_proto: it's most probably not because of Protobuf, but because it's two nested python for loops and each feature (column) is being serialized individually (more function calls). I bet it's less than 15-20% of the time this function spends for protobuf serialization.

@judahrand
Copy link
Member

In code example provided by @judahrand each separate serialized row has size 2896 bytes whereas useful only 400 bytes.

For the final example above using the streaming serialization without compression each row is 3489 bytes, with LZ4 compression this comes down to 1289 bytes. I'll try to look into the equivalent Protobuf serialization size.

@judahrand
Copy link
Member

if we will introduce compression on Feature Server side, we'll simply loose all time that we won with proto->arrow replacement.

Here is a quick comparison of serialization speed with and without LZ4 compression for 197 rows and 60 columns. The hit really isn't too bad and gets the row size down to 1297 bytes from 3840 bytes without compression. Would you agree this somewhat dispels this argument?

image

@tsotnet
Copy link
Collaborator

tsotnet commented Nov 16, 2021

I agree with @pyalex that pyarrow format's main advantages lie when serializing large amounts of data (otherwise the overhead is too high). So it makes a lot of sense to use pyarrow type system for defining features and when working with offline stores.

I don't think using pyarrow in online stores is a good idea - both the size of the data and the speed of serialization won't be close to the theoretical optimum. Especially if this is going to be a long(er) lived standard for the feature store. I imagine the optimal solution to be directly serializing the memory to the DB without additional metadata. Optionally, we could compress that if required. One such solution would be using flatbuffers https://google.github.io/flatbuffers/.

@judahrand
Copy link
Member

judahrand commented Nov 16, 2021

Also, my 2 cents about convert_arrow_to_proto: it's most probably not because of Protobuf, but because it's two nested python for loops and each feature (column) is being serialized individually (more function calls). I bet it's less than 15-20% of the time this function spends for protobuf serialization.

I think that you may well be correct on this, however, many of these function calls and loops etc are needed due to the type conversions between Feast's Protobuf types and Python types which come from Arrow. All of these could be avoided if we stayed in Arrow I think?

The below screenshot demonstrates that even in the best case scenario of converting all the the values from the Arrow table straight to Protobuf knowing all their types in advance Protobuf is slower than Arrow (3x so when also converting to record format rather than columnar). Although, I will concede there are advantages to having each cell serialized rather than just each row (from a memory perspective). I'm not sure how we'd not serialize every individual cell though when using Protobuf? Which means the fact there are more function calls is true but somewhat irrelevant?

This was done with 200 x 50 doubles.

image

@judahrand
Copy link
Member

judahrand commented Nov 16, 2021

Correction - you can convert to records in 13ms.

%timeit d = table.to_pydict(); [dict(zip(d.keys(), tuple([ProtoValue(double_val=col).SerializeToString() for col in row]))) for row in zip(*d.values())]
13.8 ms ± 9.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

@judahrand
Copy link
Member

I don't think using pyarrow in online stores is a good idea - both the size of the data and the speed of serialization won't be close to the theoretical optimum. Especially if this is going to be a long(er) lived standard for the feature store. I imagine the optimal solution to be directly serializing the memory to the DB without additional metadata. Optionally, we could compress that if required. One such solution would be using flatbuffers https://google.github.io/flatbuffers/.

Arrow's IPC format is already implemented using FlatBuffers: https://github.com/apache/arrow/blob/master/format/Message.fbs and in fairness they do technically expose the individual columns. So might in theory be possible to manually pull apart a serialized RecordBatch into column buffers.

@judahrand
Copy link
Member

I think the important thing here is to keep a format which makes types consistent.

@judahrand
Copy link
Member

judahrand commented Nov 17, 2021

Here's an interesting proof of concept of breaking up the Arrow buffers directly:

from typing import Dict, List, Tuple

import pyarrow as pa
import numpy as np


num_rows, num_cols = 200, 50
num_features = num_rows * num_cols
data = np.random.rand(num_cols, num_rows)
table = pa.Table.from_arrays(data, names=[str(_) for _ in range(num_cols)])


def serialize_array_to_slots(array: pa.Array) -> List[Tuple[bool, pa.Buffer]]:
    validity_buf, values_buf = array.buffers()

    # Turn the validity bytes into a list of bits
    if validity_buf is None:
        validity_chunks = [True] * len(array)
    else:
        validity_int = int.from_bytes(validity_buf, byteorder='little')
        validity_chunks = [validity_int >> idx & 1 for idx in range(table.shape[0])]

    # Separate slot of the array into its own buffer
    byte_width = array.type.bit_width // 8
    value_chunks = [values_buf[i:i+byte_width] for i in range(array.offset, values_buf.size, byte_width)]

    return list(zip(validity_chunks, value_chunks))


def deserialize_slots_to_array(type: pa.DataType, slots: List[Tuple[bool, pa.Buffer]]) -> pa.Array:
    validity_bits, value_chunks = tuple(zip(*[slot for slot in slots]))

    # Recreate the validity buffer from the individual validity bits
    validity_buf = pa.py_buffer(
        (
            reduce(xor, [bit << idx for idx, bit in enumerate(validity_bits)])
        ).to_bytes((len(validity_bits) + 7) // 8, byteorder='little')
    )
   
    # Join value chunks together into a single buffer
    values_buf = pa.py_buffer(b"".join(value_chunks))

    # Recreate the Arrow array
    return pa.Array.from_buffers(type, len(slots), [validity_buf, values_buf])


def serialize_table(table: pa.Table) -> Dict[pa.Field, List[Tuple[bool, pa.Buffer]]]:
    columns = {}
    for field in table.schema:
        for batch in table.to_batches():
            columns[field] = serialize_array_to_slots(batch.column(field.name))
    return columns


def deserialize_table(columns: Dict[pa.Field, List[Tuple[bool, pa.Buffer]]]) -> pa.Table:
    data = []
    for field, slots in columns.items():
        data.append(deserialize_slots_to_array(field.type, slots))
    schema = pa.schema(columns.keys())
    return pa.table(data, schema=schema)


%timeit serialize_table(table)
ser = serialize_table(table)
assert table == deserialize_table(ser)
%timeit deserialize_table(ser)
5.76 ms ± 136 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
2.41 ms ± 220 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

This little demo only works for the Fixed-size Primitive Layout but I don't see an obvious reason that a similar approach couldn't work for the other layouts with a bit more thought and complexity.

Does this seem a better (if more complicated) approach to you @tsotnet and @pyalex? It keeps the benefits of staying in an Arrow compatible format and so avoiding type conversion, serializes much more efficiently than straight up serializing the RecordBatches, and has the same advantages as the current Protobuf serialization around serializing each cell individually. Wrapping this up into a Protobuf or FlatBuffer will also be necessary, I think.

@judahrand
Copy link
Member

judahrand commented Nov 21, 2021

Anyone else have any thoughts on this? I really do think this is the key thing that is stoping Feast being a really solid project. The type system feels crucial to be able to claim to solve the online vs offline consistency problem.

Having examined the Arrow data structure I don't think that there is anything inherently wrong with using it to describe individual rows of a FeatureView. There is some redundant information stored but from a convenience and compatibility perspective this feels worth it if it is performant enough for Feast's needs. I'd encourage looking at:
https://github.com/apache/arrow/blob/2155d46494a340111453a9696e304dd5eca83919/format/Message.fbs#L133
https://github.com/apache/arrow/blob/2155d46494a340111453a9696e304dd5eca83919/format/Message.fbs#L83

For the simple case of non-list features, I agree, there is more data than optimal serialized but having tried to efficiently serialize more complicated list data while maintaining compatibility with Arrow it is clear why a lot of the extra information is needed.

@adchia
Copy link
Collaborator

adchia commented Nov 24, 2021

Agreed with @woop that we should probably have an RFC here to have a more organized discussion.

"Performant enough" is a bit ambiguous here. Ultimately, we'll probably want to have a test scenario that swaps in arrow and sees its impact in online store reading (probably with redis as the online store). I think we generally all agree that Arrow is probably better suited for representing types (though a lot of the complexity in Feast types also comes from mapping offline store types to Feast types, which would only be partially solved here), but are unsure on the performance implications.

FWIW, having larger payloads isn't inherently a bad thing. IIRC network latency doesn't really correlate to size of data being transferred (up to a point). The most latency sensitive part of Feast we need to address here is in online feature retrieval which I'd like to see a benchmark done for (w/ vs w/o arrow being stored in online stores)

@judahrand
Copy link
Member

@judahrand judahrand reopened this Nov 24, 2021
@judahrand
Copy link
Member

Ultimately, we'll probably want to have a test scenario that swaps in arrow and sees its impact in online store

I've been having a look at this on the side for the last few days and it isn't super straightforward to swap everything over. If someone else with a bit more time to dump into it would like to give it a go I'd be really grateful.

I stumbled across this feast-dev/feast-spark#48 which seemed somewhat relevant and is a previous example of suggesting Feast store a whole FeatureView row per key.

@adchia
Copy link
Collaborator

adchia commented Nov 24, 2021

Ok yeah we'll likely be taking a deeper look into this very soon. It's definitely high on our priority list

@judahrand
Copy link
Member

judahrand commented Dec 16, 2021

Are there any more plans on this? In my opinion the (de)serialization cost to/from the online store should be the top priority.

@adchia adchia added the kind/bug label Jan 7, 2022
@stale
Copy link

stale bot commented May 25, 2022

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the wontfix This will not be worked on label May 25, 2022
@stale stale bot closed this as completed Jun 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug priority/p1 wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

8 participants