Skip to content

Commit

Permalink
[Java] Don't wait for receiver to close channel endpoint in the drive…
Browse files Browse the repository at this point in the history
…r conductor as this is no longer required and can cause he driver to lockup. Issue #338.
  • Loading branch information
mjpt777 committed Apr 12, 2017
1 parent ee409cc commit 2fef3e4
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 17 deletions.
13 changes: 4 additions & 9 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ public DriverConductor(final Context ctx)

public void onClose()
{
sendChannelEndpointByChannelMap.values().forEach(SendChannelEndpoint::close);
receiveChannelEndpointByChannelMap.values().forEach(ReceiveChannelEndpoint::close);
networkPublications.forEach(NetworkPublication::close);
publicationImages.forEach(PublicationImage::close);
ipcPublications.forEach(IpcPublication::close);
sendChannelEndpointByChannelMap.values().forEach(SendChannelEndpoint::close);
receiveChannelEndpointByChannelMap.values().forEach(ReceiveChannelEndpoint::close);
}

public String roleName()
Expand Down Expand Up @@ -702,11 +702,6 @@ void onRemoveSubscription(final long registrationId, final long correlationId)
channelEndpoint.closeStatusIndicator();
receiveChannelEndpointByChannelMap.remove(channelEndpoint.udpChannel().canonicalForm());
receiverProxy.closeReceiveChannelEndpoint(channelEndpoint);

while (!channelEndpoint.isClosed())
{
Thread.yield();
}
}
}

Expand Down Expand Up @@ -1240,7 +1235,7 @@ private PublicationParams getPublicationParams(
{
if (count < 3)
{
throw new IllegalStateException("Params must be used as a set: " +
throw new IllegalStateException("Params must be used as a complete set: " +
INITIAL_TERM_ID_PARAM_NAME + " " + TERM_ID_PARAM_NAME + " " + TERM_OFFSET_PARAM_NAME);
}

Expand All @@ -1252,7 +1247,7 @@ private PublicationParams getPublicationParams(
{
throw new IllegalStateException(
TERM_OFFSET_PARAM_NAME + "=" + params.termOffset + " > " +
TERM_LENGTH_PARAM_NAME + "=" + params.termLength);
TERM_LENGTH_PARAM_NAME + "=" + params.termLength);
}

params.isReplay = true;
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/main/java/io/aeron/driver/Receiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private void checkPendingSetupMessages(final long nowNs)
else if (pending.shouldElicitSetupMessage())
{
pending.channelEndpoint().sendSetupElicitingStatusMessage(
pending.controlAddress(), pending.sessionId(), pending.streamId());
pending.controlAddress(), pending.sessionId(), pending.streamId());
pending.timeOfStatusMessage(nowNs);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ReceiveChannelEndpoint extends UdpChannelTransport
private final Int2IntCounterMap refCountByStreamIdMap = new Int2IntCounterMap(0);

private final long receiverId;
private volatile boolean isClosed = false;
private boolean isClosed = false;

public ReceiveChannelEndpoint(
final UdpChannel udpChannel,
Expand Down Expand Up @@ -140,11 +140,6 @@ public void close()
isClosed = true;
}

public boolean isClosed()
{
return isClosed;
}

public void openChannel()
{
openDatagramChannel(statusIndicator);
Expand Down Expand Up @@ -190,7 +185,7 @@ public boolean hasExplicitControl()

public InetSocketAddress explicitControlAddress()
{
return (udpChannel.hasExplicitControl()) ? udpChannel.localControl() : null;
return udpChannel.hasExplicitControl() ? udpChannel.localControl() : null;
}

public int onDataPacket(
Expand Down

0 comments on commit 2fef3e4

Please sign in to comment.