Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject current publications that specify term-id, term-offset or term-length #773

Merged
merged 2 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 74 additions & 70 deletions aeron-driver/src/main/c/uri/aeron_uri.c
Original file line number Diff line number Diff line change
Expand Up @@ -473,91 +473,95 @@ int aeron_uri_publication_params(
return -1;
}

if (is_exclusive)
int count = 0;

int32_t initial_term_id;
int32_t term_id;
int parse_result;

parse_result = aeron_uri_get_int32(uri_params, AERON_URI_INITIAL_TERM_ID_KEY, &initial_term_id);
if (parse_result < 0)
{
int count = 0;
return -1;
}
count += parse_result > 0 ? 1 : 0;

parse_result = aeron_uri_get_int32(uri_params, AERON_URI_TERM_ID_KEY, &term_id);
if (parse_result < 0)
{
return -1;
}
count += parse_result > 0 ? 1 : 0;

const char *term_offset_str = aeron_uri_find_param_value(uri_params, AERON_URI_TERM_OFFSET_KEY);
count += term_offset_str ? 1 : 0;

int32_t initial_term_id;
int32_t term_id;
int parse_result;
if (count > 0)
{
char *end_ptr = NULL;

parse_result = aeron_uri_get_int32(uri_params, AERON_URI_INITIAL_TERM_ID_KEY, &initial_term_id);
if (parse_result < 0)
if (!is_exclusive)
{
aeron_set_err(
EINVAL, "params: %s %s %s are not supported for concurrent publications",
AERON_URI_INITIAL_TERM_ID_KEY, AERON_URI_TERM_ID_KEY, AERON_URI_TERM_OFFSET_KEY);
return -1;
}
count += parse_result > 0 ? 1 : 0;

parse_result = aeron_uri_get_int32(uri_params, AERON_URI_TERM_ID_KEY, &term_id);
if (parse_result < 0)
if (count < 3)
{
aeron_set_err(
EINVAL, "params must be used as a complete set: %s %s %s", AERON_URI_INITIAL_TERM_ID_KEY,
AERON_URI_TERM_ID_KEY, AERON_URI_TERM_OFFSET_KEY);
return -1;
}
count += parse_result > 0 ? 1 : 0;

const char *term_offset_str = aeron_uri_find_param_value(uri_params, AERON_URI_TERM_OFFSET_KEY);
count += term_offset_str ? 1 : 0;

if (count > 0)
errno = 0;
end_ptr = NULL;
uint64_t term_offset = strtoull(term_offset_str, &end_ptr, 0);
if ((term_offset == 0 && 0 != errno) || end_ptr == term_offset_str)
{
char *end_ptr = NULL;

if (count < 3)
{
aeron_set_err(
EINVAL, "params must be used as a complete set: %s %s %s", AERON_URI_INITIAL_TERM_ID_KEY,
AERON_URI_TERM_ID_KEY, AERON_URI_TERM_OFFSET_KEY);
return -1;
}

errno = 0;
end_ptr = NULL;
uint64_t term_offset = strtoull(term_offset_str, &end_ptr, 0);
if ((term_offset == 0 && 0 != errno) || end_ptr == term_offset_str)
{
aeron_set_err(EINVAL, "could not parse %s in URI", AERON_URI_TERM_OFFSET_KEY);
return -1;
}

if (aeron_sub_wrap_i32(term_id, initial_term_id) < 0)
{
aeron_set_err(
EINVAL,
"Param difference greater than 2^31 - 1: %s=%" PRId32 " %s=%" PRId32,
AERON_URI_INITIAL_TERM_ID_KEY,
initial_term_id,
AERON_URI_TERM_OFFSET_KEY,
term_id);
return -1;
}
aeron_set_err(EINVAL, "could not parse %s in URI", AERON_URI_TERM_OFFSET_KEY);
return -1;
}

if (term_offset > params->term_length)
{
aeron_set_err(
EINVAL,
"Param %s=%" PRIu64 " > %s=%" PRIu64,
AERON_URI_TERM_OFFSET_KEY,
term_offset,
AERON_URI_TERM_LENGTH_KEY,
params->term_length);
return -1;
}
if (aeron_sub_wrap_i32(term_id, initial_term_id) < 0)
{
aeron_set_err(
EINVAL,
"Param difference greater than 2^31 - 1: %s=%" PRId32 " %s=%" PRId32,
AERON_URI_INITIAL_TERM_ID_KEY,
initial_term_id,
AERON_URI_TERM_OFFSET_KEY,
term_id);
return -1;
}

if ((term_offset & (AERON_LOGBUFFER_FRAME_ALIGNMENT - 1u)) != 0)
{
aeron_set_err(
EINVAL,
"Param %s=%" PRIu64 " must be multiple of FRAME_ALIGNMENT",
AERON_URI_TERM_OFFSET_KEY,
params->term_offset);
return -1;
}
if (term_offset > params->term_length)
{
aeron_set_err(
EINVAL,
"Param %s=%" PRIu64 " > %s=%" PRIu64,
AERON_URI_TERM_OFFSET_KEY,
term_offset,
AERON_URI_TERM_LENGTH_KEY,
params->term_length);
return -1;
}

params->term_offset = term_offset;
params->initial_term_id = initial_term_id;
params->term_id = term_id;
params->has_position = true;
if ((term_offset & (AERON_LOGBUFFER_FRAME_ALIGNMENT - 1u)) != 0)
{
aeron_set_err(
EINVAL,
"Param %s=%" PRIu64 " must be multiple of FRAME_ALIGNMENT",
AERON_URI_TERM_OFFSET_KEY,
params->term_offset);
return -1;
}

params->term_offset = term_offset;
params->initial_term_id = initial_term_id;
params->term_id = term_id;
params->has_position = true;
}

const char *value_str;
Expand Down
86 changes: 44 additions & 42 deletions aeron-driver/src/main/java/io/aeron/driver/PublicationParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,60 +61,62 @@ static PublicationParams getPublicationParams(
params.getSparse(channelUri);
params.getEos(channelUri);

if (isExclusive)
{
int count = 0;
int count = 0;

final String initialTermIdStr = channelUri.get(INITIAL_TERM_ID_PARAM_NAME);
count = initialTermIdStr != null ? count + 1 : count;
final String initialTermIdStr = channelUri.get(INITIAL_TERM_ID_PARAM_NAME);
count = initialTermIdStr != null ? count + 1 : count;

final String termIdStr = channelUri.get(TERM_ID_PARAM_NAME);
count = termIdStr != null ? count + 1 : count;
final String termIdStr = channelUri.get(TERM_ID_PARAM_NAME);
count = termIdStr != null ? count + 1 : count;

final String termOffsetStr = channelUri.get(TERM_OFFSET_PARAM_NAME);
count = termOffsetStr != null ? count + 1 : count;
final String termOffsetStr = channelUri.get(TERM_OFFSET_PARAM_NAME);
count = termOffsetStr != null ? count + 1 : count;

if (count > 0)
if (count > 0)
{
if (!isExclusive)
{
if (count < 3)
{
throw new IllegalArgumentException("params must be used as a complete set: " +
INITIAL_TERM_ID_PARAM_NAME + " " + TERM_ID_PARAM_NAME + " " + TERM_OFFSET_PARAM_NAME);
}

params.initialTermId = Integer.parseInt(initialTermIdStr);
params.termId = Integer.parseInt(termIdStr);
params.termOffset = Integer.parseInt(termOffsetStr);
throw new IllegalArgumentException("params: " + INITIAL_TERM_ID_PARAM_NAME + " " + TERM_ID_PARAM_NAME +
" " + TERM_OFFSET_PARAM_NAME + " are not supported for concurrent publications");
}
if (count < 3)
{
throw new IllegalArgumentException("params must be used as a complete set: " +
INITIAL_TERM_ID_PARAM_NAME + " " + TERM_ID_PARAM_NAME + " " + TERM_OFFSET_PARAM_NAME);
}

if (params.termOffset > params.termLength)
{
throw new IllegalArgumentException(
TERM_OFFSET_PARAM_NAME + "=" + params.termOffset + " > " +
TERM_LENGTH_PARAM_NAME + "=" + params.termLength);
}
params.initialTermId = Integer.parseInt(initialTermIdStr);
params.termId = Integer.parseInt(termIdStr);
params.termOffset = Integer.parseInt(termOffsetStr);

if (params.termOffset < 0 || params.termOffset > LogBufferDescriptor.TERM_MAX_LENGTH)
{
throw new IllegalArgumentException(
TERM_OFFSET_PARAM_NAME + "=" + params.termOffset + " out of range");
}
if (params.termOffset > params.termLength)
{
throw new IllegalArgumentException(
TERM_OFFSET_PARAM_NAME + "=" + params.termOffset + " > " +
TERM_LENGTH_PARAM_NAME + "=" + params.termLength);
}

if ((params.termOffset & (FrameDescriptor.FRAME_ALIGNMENT - 1)) != 0)
{
throw new IllegalArgumentException(
TERM_OFFSET_PARAM_NAME + "=" + params.termOffset + " must be a multiple of FRAME_ALIGNMENT");
}
if (params.termOffset < 0 || params.termOffset > LogBufferDescriptor.TERM_MAX_LENGTH)
{
throw new IllegalArgumentException(
TERM_OFFSET_PARAM_NAME + "=" + params.termOffset + " out of range");
}

if (params.termId - params.initialTermId < 0)
{
throw new IllegalStateException(
"difference greater than 2^31 - 1: " + INITIAL_TERM_ID_PARAM_NAME + "=" +
params.initialTermId + " when " + TERM_ID_PARAM_NAME + "=" + params.termId);
if ((params.termOffset & (FrameDescriptor.FRAME_ALIGNMENT - 1)) != 0)
{
throw new IllegalArgumentException(
TERM_OFFSET_PARAM_NAME + "=" + params.termOffset + " must be a multiple of FRAME_ALIGNMENT");
}

}
if (params.termId - params.initialTermId < 0)
{
throw new IllegalStateException(
"difference greater than 2^31 - 1: " + INITIAL_TERM_ID_PARAM_NAME + "=" +
params.initialTermId + " when " + TERM_ID_PARAM_NAME + "=" + params.termId);

params.hasPosition = true;
}

params.hasPosition = true;
}

return params;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.aeron;

import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import io.aeron.exceptions.RegistrationException;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.test.TestMediaDriver;
import org.agrona.ErrorHandler;
import org.junit.Test;

import static org.mockito.Mockito.mock;

public class SpecifiedPositionPublicationTest
{
private final ErrorHandler mockErrorHandler = mock(ErrorHandler.class);
private final MediaDriver.Context mediaDriverContext = new MediaDriver.Context()
.errorHandler(mockErrorHandler)
.dirDeleteOnShutdown(true)
.publicationTermBufferLength(LogBufferDescriptor.TERM_MIN_LENGTH)
.threadingMode(ThreadingMode.SHARED);

@Test(expected = RegistrationException.class)
public void shouldRejectSpecifiedPositionForConcurrentPublications()
{
try (TestMediaDriver testMediaDriver = TestMediaDriver.launch(mediaDriverContext);
Aeron aeron = Aeron.connect())
{
final String channel = new ChannelUriStringBuilder()
.media("ipc")
.initialPosition(1024, -873648623, 65536)
.build();
aeron.addPublication(channel, 101);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void recordAndReplayRegularPublication()

prePublicationActionsAndVerifications(archiveProxy, controlPublication, recordingEvents);

final Publication recordedPublication = client.addPublication(publishUri, PUBLISH_STREAM_ID);
final Publication recordedPublication = client.addExclusivePublication(publishUri, PUBLISH_STREAM_ID);

final int sessionId = recordedPublication.sessionId();
final int termBufferLength = recordedPublication.termBufferLength();
Expand Down