-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(p2p): implement PeerConnections [part 2/11] #1174
base: master
Are you sure you want to change the base?
Conversation
self.pubsub.publish( | ||
HathorEvents.NETWORK_PEER_CONNECTION_FAILED, | ||
peer=peer, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this for simplicity as it's not used.
if endpoint.peer_id is not None and peer is not None: | ||
assert endpoint.peer_id == peer.id, 'the entrypoint peer_id does not match the actual peer_id' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an assertion because the only caller that passes a peer
constructs the endpoint
using the peer.id
. In another PR this may be further improved so this assertion can be removed.
def on_peer_disconnect(self, protocol: HathorProtocol) -> None: | ||
"""Called when a peer disconnect.""" | ||
self.connections.discard(protocol) | ||
if protocol in self.handshaking_peers: | ||
self.handshaking_peers.remove(protocol) | ||
if protocol._peer is not None: | ||
existing_protocol = self.connected_peers.pop(protocol.peer.id, None) | ||
if existing_protocol is None: | ||
# in this case, the connection was closed before it got to READY state | ||
return | ||
if existing_protocol != protocol: | ||
# this is the case we're closing a duplicate connection. We need to set the | ||
# existing protocol object back to connected_peers, as that connection is still ongoing. | ||
# A check for duplicate connections is done during PEER_ID state, but there's still a | ||
# chance it can happen if both connections start at the same time and none of them has | ||
# reached READY state while the other is on PEER_ID state | ||
self.connected_peers[protocol.peer.id] = existing_protocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic was simplified by calling a specific method for each state.
def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol: | ||
""" When there are duplicate connections, determine which one should be dropped. | ||
|
||
We keep the connection initiated by the peer with larger id. A simple (peer_id1 > peer_id2) | ||
on the peer id string is used for this comparison. | ||
""" | ||
assert protocol.peer is not None | ||
assert protocol.peer.id is not None | ||
assert protocol.my_peer.id is not None | ||
other_connection = self.connected_peers[protocol.peer.id] | ||
if bytes(protocol.my_peer.id) > bytes(protocol.peer.id): | ||
# connection started by me is kept | ||
if not protocol.inbound: | ||
# other connection is dropped | ||
return other_connection | ||
else: | ||
# this was started by peer, so drop it | ||
return protocol | ||
else: | ||
# connection started by peer is kept | ||
if not protocol.inbound: | ||
return protocol | ||
else: | ||
return other_connection | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was moved to PeerConnections
and simplified.
else: | ||
# When the peer is the server part of the connection we don't have the full entrypoint description | ||
# So we can only validate the host from the protocol | ||
assert protocol.transport is not None | ||
connection_remote = protocol.transport.getPeer() | ||
connection_host = getattr(connection_remote, 'host', None) | ||
if connection_host is None: | ||
continue | ||
# Connection host has only the IP | ||
# So we must consider that the entrypoint could be in name format and we just validate the host | ||
if connection_host == entrypoint.host: | ||
return True | ||
test_mode = not_none(DifficultyAdjustmentAlgorithm.singleton).TEST_MODE | ||
result = await discover_dns(entrypoint.host, test_mode) | ||
if connection_host in [entrypoint.addr.host for entrypoint in result]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to remove this because before we only had entrypoint
for outbound peers, but now we have addr
for both inbound and outbound peers.
if protocol.entrypoint is not None and protocol.entrypoint.peer_id is not None: | ||
assert protocol.entrypoint.peer_id == peer.id | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The protocol does not contain an entrypoint
anymore, so this can be removed.
09ef898
to
497dac3
Compare
if bytes(self.peer1.id) > bytes(self.peer2.id): | ||
tr_dead = self.conn.tr1 | ||
tr_dead_value = self.conn.peek_tr1_value() | ||
proto_alive = conn.proto2 | ||
conn_alive = conn | ||
elif conn.tr1.disconnecting or conn.tr2.disconnecting: | ||
conn_dead = conn | ||
conn_alive = self.conn | ||
else: | ||
raise Exception('It should never happen.') | ||
self._check_result_only_cmd(conn_dead.peek_tr1_value() + conn_dead.peek_tr2_value(), b'ERROR') | ||
tr_dead = conn.tr2 | ||
tr_dead_value = conn.peek_tr2_value() | ||
proto_alive = self.conn.proto1 | ||
conn_alive = self.conn | ||
|
||
self._check_result_only_cmd(tr_dead_value, b'ERROR') | ||
# at this point, the connection must be closing as the error was detected on READY state | ||
self.assertIn(True, [conn_dead.tr1.disconnecting, conn_dead.tr2.disconnecting]) | ||
# check connected_peers | ||
connected_peers = list(self.manager1.connections.connected_peers.values()) | ||
self.assertEquals(1, len(connected_peers)) | ||
self.assertIn(connected_peers[0], [conn_alive.proto1, conn_alive.proto2]) | ||
self.assertTrue(tr_dead.disconnecting) | ||
# check ready_peers | ||
ready_peers = list(self.manager1.connections.iter_ready_connections()) | ||
self.assertEquals(1, len(ready_peers)) | ||
self.assertEquals(ready_peers[0], proto_alive) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I refactored the method that chooses which connection to drop, I updated this test to actually test the specific connection that is dropped, which is important.
Before, it would just test that some connection is dropped, but not which exact connection is dropped.
@staticmethod | ||
def _should_drop_new_connection(new_conn: HathorProtocol) -> bool: | ||
""" | ||
When there are connections with duplicate PeerIds, determine which one should be dropped, the old or the new. | ||
Return True if we should drop the new connection, and False otherwise. | ||
|
||
The logic to determine this is `(my_peer_id > other_peer_id) XNOR new_conn.inbound`. | ||
""" | ||
my_peer_is_larger = bytes(new_conn.my_peer.id) > bytes(new_conn.peer.id) | ||
return my_peer_is_larger == new_conn.inbound |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored this from ConnectionsManager
, simplifying its logic to a simple boolean operation. The old docstring and comments were incorrect, because it stated a decision to drop either the connection initiated by us, or the one initiated by the other peer.
This was incorrect because it's possible that both duplicate connections are initiated by the other peer, for example. Therefore the correct description is just choosing whether to drop the old connection or the new, independent of who initiated them. The new_conn.inbound
is used in the boolean logic keeping the exact same behavior as before, but it doesn't actually convey any meaning.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1174 +/- ##
==========================================
+ Coverage 84.85% 84.96% +0.10%
==========================================
Files 323 324 +1
Lines 25044 25101 +57
Branches 3846 3838 -8
==========================================
+ Hits 21252 21327 +75
Misses 3046 3046
+ Partials 746 728 -18 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
497dac3
to
eee9bb0
Compare
eee9bb0
to
8fbf968
Compare
8fbf968
to
8ed5b0d
Compare
Depends on #1173
Motivation
This PR is mostly a refactor for the P2P Multiprocess project, but it contains several general improvements and also a small bug fix.
Currently, the
ConnectionsManager
class keeps track of existing connections in their different states in multiple attributes with heterogeneous types. Connecting peers are identified byIStreamClientEndpoints
, handshaking peers are identified by protocol instances themselves, ready peers are identified byPeerIds
, and there's also aconnections
attribute that holds all of them as protocol instances too.The main refactor of this PR is moving, renaming, and changing all those attributes. They're moved to the new
PeerConnections
class, that now holds all logic related to handling the connections as they advance in their state machine.Now, every connection is uniquely identified by its connection address, which is present in both inbound and outbound connections (contrary to the
entrypoint
, which is removed). This is less error prone as its now impossible to connect to the same address twice (there was a small bug that briefly allowed bootstrap connections to the same address). Renames are also done to reduce confusion, specifically "ready peers" was called "connected peers" and is updated.Here's the list of
ConnectionsManager
attributes that are moved toPeerConnections
and updated to usePeerAddress
:connecting_peers: dict[IStreamClientEndpoint, _ConnectingPeer]
→_connecting_outbound: set[PeerAddress]
.handshaking_peers: set[HathorProtocol]
→_handshaking: dict[PeerAddress, HathorProtocol]
.connected_peers: dict[PeerId, HathorProtocol]
→_ready: dict[PeerAddress, HathorProtocol]
.connections: set[HathorProtocol]
→ removed, but equivalent methods are provided asget_connected_peers
or similar.Acceptance Criteria
PeerConnections
class, moving some connection logic from theConnectionsManager
to it.addr: PeerAddress
attribute to be uniquely identified. Theentrypoint
attribute is removed.use_ssl
parameter fromConnectionsManager.connect_to
andlisten
— this was unused, the instance attribute is used instead."deferred"
key fromstatus
API and update its address strings.Checklist
master
, confirm this code is production-ready and can be included in future releases as soon as it gets merged