Skip to content

Commit

Permalink
ATLAS-4966 : atlas-server-api module: update for code readability imp… (
Browse files Browse the repository at this point in the history
  • Loading branch information
pareshddevalia authored Feb 17, 2025
1 parent de0996b commit 43faf4f
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 57 deletions.
6 changes: 5 additions & 1 deletion server-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
<name>Apache Atlas Server API</name>
<description>Apache Atlas Server related APIs</description>

<dependencies>
<properties>
<checkstyle.failOnViolation>true</checkstyle.failOnViolation>
<checkstyle.skip>false</checkstyle.skip>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-v1</artifactId>
Expand Down
71 changes: 43 additions & 28 deletions server-api/src/main/java/org/apache/atlas/RequestContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@

import static org.apache.atlas.model.instance.AtlasObjectId.KEY_GUID;


public class RequestContext {
private static final Logger METRICS = LoggerFactory.getLogger("METRICS");

private static final ThreadLocal<RequestContext> CURRENT_CONTEXT = new ThreadLocal<>();
private static final Set<RequestContext> ACTIVE_REQUESTS = new HashSet<>();
private static final ThreadLocal<RequestContext> CURRENT_CONTEXT = new ThreadLocal<>();
private static final Set<RequestContext> ACTIVE_REQUESTS = new HashSet<>();
private static final boolean isMetricsEnabled = METRICS.isDebugEnabled();

private final long requestTime = System.currentTimeMillis();
Expand All @@ -58,28 +57,27 @@ public class RequestContext {
private final Map<String, List<AtlasClassification>> addedPropagations = new HashMap<>();
private final Map<String, List<AtlasClassification>> removedPropagations = new HashMap<>();
private final AtlasPerfMetrics metrics = isMetricsEnabled ? new AtlasPerfMetrics() : null;
private List<EntityGuidPair> entityGuidInRequest = null;
private List<EntityGuidPair> entityGuidInRequest;
private final Set<String> entitiesToSkipUpdate = new HashSet<>();
private final Set<String> onlyCAUpdateEntities = new HashSet<>();
private final Set<String> onlyBAUpdateEntities = new HashSet<>();
private final List<AtlasTask> queuedTasks = new ArrayList<>();


private String user;
private Set<String> userGroups;
private String clientIPAddress;
private List<String> forwardedAddresses;
private DeleteType deleteType = DeleteType.DEFAULT;
private boolean isPurgeRequested = false;
private int maxAttempts = 1;
private int attemptCount = 1;
private boolean isImportInProgress = false;
private boolean isMigrationInProgress = false;
private boolean isInNotificationProcessing = false;
private boolean isInTypePatching = false;
private boolean createShellEntityForNonExistingReference = false;
private boolean skipFailedEntities = false;
private String currentTypePatchAction = "";
private boolean isPurgeRequested;
private boolean skipFailedEntities;
private boolean isImportInProgress;
private boolean isMigrationInProgress;
private boolean isInNotificationProcessing;
private boolean isInTypePatching;
private boolean createShellEntityForNonExistingReference;
private DeleteType deleteType = DeleteType.DEFAULT;
private String currentTypePatchAction = "";
private int maxAttempts = 1;
private int attemptCount = 1;

private RequestContext() {
}
Expand All @@ -91,6 +89,7 @@ public static RequestContext get() {

if (ret == null) {
ret = new RequestContext();

CURRENT_CONTEXT.set(ret);

synchronized (ACTIVE_REQUESTS) {
Expand Down Expand Up @@ -141,20 +140,24 @@ public void clearCache() {

public static String getCurrentUser() {
RequestContext context = CURRENT_CONTEXT.get();
String ret = context != null ? context.getUser() : null;
String ret = context != null ? context.getUser() : null;

if (StringUtils.isBlank(ret)) {
try {
ret = UserGroupInformation.getLoginUser().getShortUserName();
} catch (Exception e) {
ret = null;
}
if (StringUtils.isBlank(ret)){

if (StringUtils.isBlank(ret)) {
ret = System.getProperty("user.name");

if (StringUtils.isBlank(ret)) {
ret = "atlas";
}
}
}

return ret;
}

Expand All @@ -171,9 +174,13 @@ public void setUser(String user, Set<String> userGroups) {
this.userGroups = userGroups;
}

public DeleteType getDeleteType() { return deleteType; }
public DeleteType getDeleteType() {
return deleteType;
}

public void setDeleteType(DeleteType deleteType) { this.deleteType = (deleteType == null) ? DeleteType.DEFAULT : deleteType; }
public void setDeleteType(DeleteType deleteType) {
this.deleteType = (deleteType == null) ? DeleteType.DEFAULT : deleteType;
}

public String getClientIPAddress() {
return clientIPAddress;
Expand Down Expand Up @@ -215,9 +222,13 @@ public void setImportInProgress(boolean importInProgress) {
isImportInProgress = importInProgress;
}

public boolean isPurgeRequested() { return isPurgeRequested; }
public boolean isPurgeRequested() {
return isPurgeRequested;
}

public void setPurgeRequested(boolean isPurgeRequested) { this.isPurgeRequested = isPurgeRequested; }
public void setPurgeRequested(boolean isPurgeRequested) {
this.isPurgeRequested = isPurgeRequested;
}

public boolean isInNotificationProcessing() {
return isInNotificationProcessing;
Expand Down Expand Up @@ -260,25 +271,25 @@ public void setCurrentTypePatchAction(String currentTypePatchAction) {
}

public void recordEntityUpdate(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null && ! entitiesToSkipUpdate.contains(entity.getGuid())) {
if (entity != null && entity.getGuid() != null && !entitiesToSkipUpdate.contains(entity.getGuid())) {
updatedEntities.put(entity.getGuid(), entity);
}
}

public void recordEntityToSkip(String guid) {
if(! StringUtils.isEmpty(guid)) {
if (!StringUtils.isEmpty(guid)) {
entitiesToSkipUpdate.add(guid);
}
}

public void recordEntityWithCustomAttributeUpdate(String guid) {
if(! StringUtils.isEmpty(guid)) {
if (!StringUtils.isEmpty(guid)) {
onlyCAUpdateEntities.add(guid);
}
}

public void recordEntityWithBusinessAttributeUpdate(String guid) {
if(! StringUtils.isEmpty(guid)) {
if (!StringUtils.isEmpty(guid)) {
onlyBAUpdateEntities.add(guid);
}
}
Expand Down Expand Up @@ -378,7 +389,9 @@ public AtlasEntity getDifferentialEntity(String guid) {
return diffEntityCache.get(guid);
}

public Collection<AtlasEntity> getDifferentialEntities() { return diffEntityCache.values(); }
public Collection<AtlasEntity> getDifferentialEntities() {
return diffEntityCache.values();
}

public Collection<AtlasEntityHeader> getUpdatedEntities() {
return updatedEntities.values();
Expand Down Expand Up @@ -423,7 +436,9 @@ public boolean isDeletedEntity(String guid) {
return deletedEntities.containsKey(guid);
}

public MetricRecorder startMetricRecord(String name) { return metrics != null ? metrics.getMetricRecorder(name) : null; }
public MetricRecorder startMetricRecord(String name) {
return metrics != null ? metrics.getMetricRecorder(name) : null;
}

public void endMetricRecord(MetricRecorder recorder) {
if (metrics != null && recorder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import java.net.InetSocketAddress;

public class AtlasServerIdSelector {

private static final Logger LOG = LoggerFactory.getLogger(AtlasServerIdSelector.class);

private AtlasServerIdSelector() {
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
}

/**
* Return the ID corresponding to this Atlas instance.
*
Expand All @@ -46,37 +49,42 @@ public class AtlasServerIdSelector {
*/
public static String selectServerId(Configuration configuration) throws AtlasException {
// ids are already trimmed by this method
String[] ids = configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS);
String matchingServerId = null;
int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT));
String[] ids = configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS);
String matchingServerId = null;
int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT));

for (String id : ids) {
String hostPort = configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +id);
String hostPort = configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX + id);

if (!StringUtils.isEmpty(hostPort)) {
InetSocketAddress socketAddress;

try {
socketAddress = NetUtils.createSocketAddr(hostPort);
} catch (Exception e) {
LOG.warn("Exception while trying to get socket address for {}", hostPort, e);

continue;
}
if (!socketAddress.isUnresolved()
&& NetUtils.isLocalAddress(socketAddress.getAddress())
&& appPort == socketAddress.getPort()) {

if (!socketAddress.isUnresolved() && NetUtils.isLocalAddress(socketAddress.getAddress()) && appPort == socketAddress.getPort()) {
LOG.info("Found matched server id {} with host port: {}", id, hostPort);

matchingServerId = id;

break;
}
} else {
LOG.info("Could not find matching address entry for id: {}", id);
}
}

if (matchingServerId == null) {
String msg = String.format("Could not find server id for this instance. " +
"Unable to find IDs matching any local host and port binding among %s",
StringUtils.join(ids, ","));
String msg = String.format("Could not find server id for this instance. Unable to find IDs matching any local host and port binding among %s", StringUtils.join(ids, ","));

throw new AtlasException(msg);
}

return matchingServerId;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* The two state transitions we handle are (1) becoming active and (2) becoming passive.
*/
public interface ActiveStateChangeHandler {
public enum HandlerOrder {
enum HandlerOrder {
AUDIT_REPOSITORY(0),
GRAPH_BACKED_SEARCH_INDEXER(1),
TYPEDEF_STORE_INITIALIZER(2),
Expand All @@ -38,11 +38,13 @@ public enum HandlerOrder {

private final int order;

private HandlerOrder(int order) {
HandlerOrder(int order) {
this.order = order;
}

public int getOrder() { return order; }
public int getOrder() {
return order;
}
}

/**
Expand All @@ -63,14 +65,13 @@ private HandlerOrder(int order) {
*
* @throws {@link AtlasException} if anything is wrong on shutdown
*/
void instanceIsPassive() throws AtlasException;

void instanceIsPassive() throws AtlasException;

/**
* Defines the order in which the handler should be called.
* When state becomes active, the handler will be called from low order to high
* When state becomes passive, the handler will be called from high order to low
*
*/
int getHandlerOrder();
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,14 @@ public interface EntityChangeListener {

/**
* This is upon updating a trait from a typed instance.
*
* @param entity the entity
* @param traits trait that needs to be added to entity
*
* @throws AtlasException if the listener notification fails
*/
void onTraitsUpdated(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException;

/**
* This is upon deleting entities from the repository.
*
* @param entities the deleted entities
* @param isImport
* @throws AtlasException
Expand All @@ -90,7 +87,6 @@ public interface EntityChangeListener {

/**
* This is upon adding a new term to a list of typed instance.
*
* @param entities entity list
* @param term term that needs to be added to entity
* @throws AtlasException if the listener notification fails
Expand All @@ -99,7 +95,6 @@ public interface EntityChangeListener {

/**
* This is upon adding a new trait to a typed instance.
*
* @param entities entity list
* @param term term that needs to be added to entity
* @throws AtlasException if the listener notification fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
* Types change notification listener.
*/
public interface TypesChangeListener {

/**
* This is upon adding new type(s) to Store.
*
Expand All @@ -44,6 +43,6 @@ public interface TypesChangeListener {
*/
// void onRemove(String typeName) throws MetadataException;

//This is upon updating an existing type to the store
void onChange(Collection<? extends AtlasType> dataTypes) throws AtlasException;
//This is upon updating an existing type to the store
void onChange(Collection<? extends AtlasType> dataTypes) throws AtlasException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.atlas.setup;

public class SetupException extends Exception {

public SetupException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/

package org.apache.atlas.setup;

/**
* Represents a step that initializes some dependency of Atlas.
*
Expand Down

0 comments on commit 43faf4f

Please sign in to comment.