Skip to content

Commit

Permalink
[Java] Fix bug with MDS where destinations are setup in advance.
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Mar 25, 2019
1 parent c907b7c commit 44a4e38
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ private void cleanBufferTo(final long newCleanPosition)

private void trackConnection(final int transportIndex, final InetSocketAddress srcAddress, final long nowNs)
{
imageConnections = ArrayUtil.ensureCapacity(imageConnections, transportIndex + 1);
ImageConnection imageConnection = imageConnections[transportIndex];

if (null == imageConnection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,12 @@ public class MultiDestinationSubscriptionTest
SystemUtil.tmpDirName() + "aeron-system-tests-" + UUID.randomUUID().toString() + File.separator;

private final MediaDriver.Context driverContextA = new MediaDriver.Context();
private final MediaDriver.Context driverContextB = new MediaDriver.Context();

private Aeron clientA;
private Aeron clientB;
private MediaDriver driverA;
private MediaDriver driverB;
private Publication publicationA;
private Publication publicationB;
private Subscription subscription;
Expand All @@ -86,6 +89,20 @@ private void launch()
clientA = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverContextA.aeronDirectoryName()));
}

private void launchSecond()
{
final String baseDirB = ROOT_DIR + "B";

driverContextB
.errorHandler(Throwable::printStackTrace)
.publicationTermBufferLength(TERM_BUFFER_LENGTH)
.aeronDirectoryName(baseDirB)
.threadingMode(ThreadingMode.SHARED);

driverB = MediaDriver.launch(driverContextB);
clientB = Aeron.connect(new Aeron.Context().aeronDirectoryName(driverContextB.aeronDirectoryName()));
}

@After
public void closeEverything()
{
Expand All @@ -94,6 +111,8 @@ public void closeEverything()
CloseHelper.close(subscription);
CloseHelper.close(clientA);
CloseHelper.close(driverA);
CloseHelper.close(clientB);
CloseHelper.close(driverB);

IoUtil.delete(new File(ROOT_DIR), true);
}
Expand Down Expand Up @@ -337,6 +356,92 @@ public void shouldSendToMultipleDestinationSubscriptionWithSameStream()
verifyFragments(fragmentHandler, numMessagesToSend);
}

@Test(timeout = 10_000)
public void shouldMergeStreamsFromMultiplePublicationsWIthSameParams() {
final int numMessagesToSend = 30;
final int numMessagesToSendForA = numMessagesToSend / 2;
final int numMessagesToSendForB = numMessagesToSend / 2;

launch();
launchSecond();

final ChannelUriStringBuilder builder = new ChannelUriStringBuilder();

builder
.clear()
.media(CommonContext.UDP_MEDIA)
.endpoint(UNICAST_ENDPOINT_A);

final String publicationChannelA = builder.build();

builder
.clear()
.media(CommonContext.UDP_MEDIA)
.endpoint(UNICAST_ENDPOINT_B);

final String destinationB = builder.build();

subscription = clientA.addSubscription(SUB_URI, STREAM_ID);
subscription.addDestination(publicationChannelA);
subscription.addDestination(destinationB);

publicationA = clientA.addExclusivePublication(publicationChannelA, STREAM_ID);

builder
.clear()
.media(CommonContext.UDP_MEDIA)
.initialPosition(0L, publicationA.initialTermId(), publicationA.termBufferLength())
.sessionId(publicationA.sessionId())
.endpoint(UNICAST_ENDPOINT_B);

final String publicationChannelB = builder.build();

publicationB = clientB.addExclusivePublication(publicationChannelB, STREAM_ID);

for (int i = 0; i < numMessagesToSendForA; i++)
{
while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}

final MutableInteger fragmentsRead = new MutableInteger();
pollForFragment(subscription, fragmentHandler, fragmentsRead);

while (publicationB.offer(buffer, 0, buffer.capacity()) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}

assertThat(subscription.poll(fragmentHandler, 10), is(0));
}

for (int i = 0; i < numMessagesToSendForB; i++)
{
while (publicationB.offer(buffer, 0, buffer.capacity()) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}

final MutableInteger fragmentsRead = new MutableInteger();
pollForFragment(subscription, fragmentHandler, fragmentsRead);

while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
}

assertThat(subscription.poll(fragmentHandler, 10), is(0));
}

assertThat(subscription.imageCount(), is(1));
verifyFragments(fragmentHandler, numMessagesToSend);
}

private void pollForFragment(
final Subscription subscription, final FragmentHandler handler, final MutableInteger fragmentsRead)
{
Expand Down

0 comments on commit 44a4e38

Please sign in to comment.