-
-
Notifications
You must be signed in to change notification settings - Fork 254
Description
Describe the bug
This bug is not specific to PyPy, but because of the way CPython handles set instances, it will only be reproducible in CPython with a large number of partitions. In PyPy (RPython) this bug can be easily reproduced with a few partitions, such as 20 in my case.
It is hard to provide a small example for this issue, because it requires a Kafka server, but the issue itself is not hard to reproduce and happens all the time.
Kafka provides aiokafka with a list of partitions in arbitrary order and when aiokafka computes a partition from the supplied key in provider.send(), it relies on the specific ordering of a set instance, which is a hash table and has no specific order. Here's the chain of calls where this occurs.
producer.send(topic, value, key)) calls AIOKafkaProducer._partition, which obtains all partitions as follows:
all_partitions = list(self._metadata.partitions_for_topic(topic))ClusterMetadata.partitions_for_topic, in turn, returns all partitions in a set, like this:
return set(self._partitions[topic].keys())The partition then is figured out via the call below, where the partitioner uses the modulo-divided Murmur2 hash value as an index into all_partitions, which has indetermined order at this point because this list was constructed from a set.
return self._partitioner(serialized_key, all_partitions, available)The reason people running this code in CPython don't see the bug is because of how CPython hashes numbers, which makes it look like a set is ordered, but this is just an illusion and it only works for small numbers.
This results in the same keys associated with different partitions, which may happen right away or days later, depending on how Kafka supplies partitions to aiokafka and how many rebalancing events takes place. This corrupts continuity of the state attributed to those keys.
Expected behaviour
There are two issues here:
- The same key must always end up in the same partition.
- The computed partition obtained outside of the
provider.send()call must be the same as the one computed for the same key insideprovider.send(), given that the same partitioner is being used in both cases.
Environment (please complete the following information):
- aiokafka version (
python -c "import aiokafka; print(aiokafka.__version__)"):'0.12.0' - Kafka Broker version (
kafka-topics.sh --version):4.0.0 - Other information (Confluent Cloud version, etc.):
aiokafkais running on Ubuntu 22.04, Python 3.10, PyPy
Reproducible example
It is hard to set up an example for this issue because it requires Kafka and a few consumers. However, the fact that set instances have indetermined order is well documented and this bug is clearly visible in the code even without a reproducible case.
You can see how CPython sets are different from PyPy sets if you run this code in PyPy and in CPython in the link provided below.
import random
l1: list[int] = [i for i in range(20)]
# simulates how Kafka provides partitions
random.shuffle(l1)
# aiokafka performs this to obtiain all_partitions
s: set[int] = set[int](l1)
l2 = list[int](s)
# will be shuffled in PyPy and will seem sorted in CPython
print(l2)Choose Python 3.10 (PyPy 7.3.12) for PyPy and Python 3.11.2 for CPython. There's no Python 3.10 at this point, which is what I'm running, but Python 3.11 shows the same behavior. You will see that CPython yields a sorted list of partitions, despite the shuffling, and PyPy produces a randomized list, which is what's happening in aiokafka running on PyPy.
Worth noting that CPython's ordering is not the intended behavior, but rather a side effect of how CPython "hashes" small numbers., and will not hold for larger partition sets.
Any time a set is converted into a list, it must be sorted, which may be expensive if done each time, but in general all_partitions just needs to be constructed using a sorted list, which will fix this bug.