Skip to content

Commit

Permalink
Merge pull request #462 from Hydrospheredata/feature/spark_logs_appender
Browse files Browse the repository at this point in the history
Feature/spark logs appender
  • Loading branch information
dos65 authored Mar 22, 2018
2 parents 9eafe30 + d9943d9 commit f25a89b
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 44 deletions.
29 changes: 4 additions & 25 deletions mist-lib/src/main/scala/io/hydrosphere/mist/api/Logging.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.hydrosphere.mist.api

import io.hydrosphere.mist.api.logging.MistLogging.{LogEvent, LogsWriter, RemoteLogsWriter, Slf4jWriter}
import io.hydrosphere.mist.api.logging.MistLogging.{LogEvent, Slf4jWriter}

trait Logging extends ContextSupport {

Expand All @@ -13,39 +13,18 @@ trait Logging extends ContextSupport {
this.jobId = conf.info.id
}

def getLogger: MLogger = new MLogger(jobId, loggingConf)
def getLogger: MLogger = new MLogger(jobId)

override private[mist] def stop(): Unit = {
super.stop()
}

}

class MLogger(
sourceId: String,
centralConf: Option[CentralLoggingConf]
) {
class MLogger(sourceId: String) {

@transient
lazy val writer = centralConf match {
case Some(conf) => new LogsWriter {
import conf._

val slf4j = new Slf4jWriter(sourceId)
val remote = RemoteLogsWriter.getOrCreate(host, port)

override def write(e: LogEvent): Unit = {
slf4j.write(e)
remote.write(e)
}

}

case None =>
val slf4j = new Slf4jWriter(sourceId)
slf4j.write(LogEvent.mkInfo(sourceId, "Central logging is not configured, using default slf4j"))
slf4j
}
lazy val writer = new Slf4jWriter(sourceId)

def debug(msg: String): Unit = {
val e = LogEvent.mkDebug(sourceId, msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.util.ByteString
import com.twitter.chill.{KryoPool, ScalaKryoInstantiator}
import com.typesafe.config.ConfigFactory
import io.hydrosphere.mist.api.logging.MistLogging.RemoteLogsWriter.Key
import org.slf4j.{Logger, LoggerFactory}

import scala.concurrent.Await
Expand Down Expand Up @@ -79,7 +80,6 @@ private [mist] object MistLogging {
writer.toString
})


LogEvent(from, message, ts, Level.Error.value, errTrace)
}

Expand All @@ -94,6 +94,7 @@ private [mist] object MistLogging {

def write(e: LogEvent): Unit

def close(): Unit
}

class Slf4jWriter(sourceId: String) extends LogsWriter {
Expand All @@ -106,6 +107,9 @@ private [mist] object MistLogging {
case Level.Warn => sl4jLogger.warn(e.message)
case Level.Error => sl4jLogger.error(e.message, e)
}

override def close(): Unit = ()

}

class RemoteLogsWriter(host: String, port: Int) extends LogsWriter {
Expand Down Expand Up @@ -144,13 +148,13 @@ private [mist] object MistLogging {

def close(): Unit = {
mat.shutdown()
RemoteLogsWriter.remove(Key(host, port))
Await.result(sys.terminate(), Duration.Inf)
}

}

object RemoteLogsWriter {

val systemConfig = ConfigFactory.parseString(
"akka.daemonic = on"
)
Expand All @@ -159,14 +163,18 @@ private [mist] object MistLogging {
private val writers = new ConcurrentHashMap[Key, RemoteLogsWriter]()

def getOrCreate(host: String, port: Int): RemoteLogsWriter = {
getWith(host, port)(key => new RemoteLogsWriter(key.host, key.port))
}

def getWith(host: String, port: Int)(createFn: Key => RemoteLogsWriter): RemoteLogsWriter = {
val key = Key(host, port)
writers.computeIfAbsent(key, new java.util.function.Function[Key, RemoteLogsWriter] {
override def apply(k: Key): RemoteLogsWriter = {
new RemoteLogsWriter(k.host, k.port)
}
override def apply(k: Key): RemoteLogsWriter = createFn(k)
})
}

def remove(key: Key): RemoteLogsWriter = writers.remove(key)

private case class Key(host: String, port: Int)

}
Expand Down
2 changes: 1 addition & 1 deletion mist-lib/src/main/scala/mist/api/MistExtrasDef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object MistExtras {
Extracted(new MistExtras(
jobId = jobId,
workerId = workerId,
logger = new MLogger(jobId, ctx.setupConf.loggingConf)
logger = new MLogger(jobId)
))
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ object MasterBridge {
new NamedContext(
sparkContext,
id,
org.apache.spark.streaming.Duration(init.streamingDuration.toMillis),
Option(centralLoggingConf)
Option(centralLoggingConf),
org.apache.spark.streaming.Duration(init.streamingDuration.toMillis)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.hydrosphere.mist.worker
import java.io.File

import io.hydrosphere.mist.api.{CentralLoggingConf, RuntimeJobInfo, SetupConfiguration}
import org.apache.commons.io.IOUtils
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
Expand All @@ -14,8 +15,8 @@ import scala.collection.mutable
class NamedContext(
val sparkContext: SparkContext,
val namespace: String,
streamingDuration: Duration = Duration(40 * 1000),
loggingConf: Option[CentralLoggingConf] = None
val loggingConf: Option[CentralLoggingConf] = None,
streamingDuration: Duration = Duration(40 * 1000)
) {

private val jars = mutable.Buffer.empty[String]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.hydrosphere.mist.worker

import io.hydrosphere.mist.api.CentralLoggingConf
import io.hydrosphere.mist.api.logging.MistLogging
import io.hydrosphere.mist.api.logging.MistLogging.{LogsWriter, RemoteLogsWriter}
import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.{AppenderSkeleton, Level, SimpleLayout}

class RemoteAppender(sourceId: String, logsWriter: LogsWriter) extends AppenderSkeleton {

override def append(event: LoggingEvent): Unit = {

val timeStamp = event.timeStamp
val message = event.getRenderedMessage
val evt = event.getLevel match {
case Level.INFO => MistLogging.LogEvent.mkInfo(sourceId, message, timeStamp)
case Level.DEBUG => MistLogging.LogEvent.mkDebug(sourceId, message, timeStamp)
case Level.ERROR =>
MistLogging.LogEvent.mkError(
sourceId, message,
Option(event.getThrowableInformation).map(_.getThrowable),
timeStamp
)
case Level.WARN => MistLogging.LogEvent.mkWarn(sourceId, message, timeStamp)
case _ => MistLogging.LogEvent.mkInfo(sourceId, this.getLayout.format(event), timeStamp)
}
logsWriter.write(evt)
}

override def close(): Unit = logsWriter.close()

override def requiresLayout(): Boolean = true
}


object RemoteAppender {
def apply(sourceId: String, loggingConf: CentralLoggingConf): RemoteAppender =
create(sourceId, RemoteLogsWriter.getOrCreate(loggingConf.host, loggingConf.port))

def create(sourceId: String, logsWriter: LogsWriter): RemoteAppender = {
val jobLogsAppender = new RemoteAppender(sourceId, logsWriter)
jobLogsAppender.setLayout(new SimpleLayout)
jobLogsAppender.setName(sourceId)
jobLogsAppender
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package io.hydrosphere.mist.worker
import java.util.concurrent.Executors

import akka.actor._
import io.hydrosphere.mist.api.CentralLoggingConf
import io.hydrosphere.mist.core.CommonData._
import io.hydrosphere.mist.worker.runners._
import mist.api.data.JsLikeData
import org.apache.log4j.{Appender, LogManager}
import org.apache.spark.streaming.StreamingContext
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.concurrent._
import scala.util.{Failure, Success}

Expand All @@ -23,6 +24,8 @@ trait JobStarting {
val runnerSelector: RunnerSelector
val namedContext: NamedContext
val artifactDownloader: ArtifactDownloader
protected val rootLogger = LogManager.getRootLogger


protected final def startJob(
req: RunJobRequest
Expand Down Expand Up @@ -72,7 +75,8 @@ trait JobStarting {
class WorkerActor(
val runnerSelector: RunnerSelector,
val namedContext: NamedContext,
val artifactDownloader: ArtifactDownloader
val artifactDownloader: ArtifactDownloader,
mkAppender: String => Option[Appender]
) extends Actor with JobStarting with ActorLogging {

implicit val ec = {
Expand All @@ -88,6 +92,7 @@ class WorkerActor(

private def awaitRequest(): Receive = {
case req: RunJobRequest =>
mkAppender(req.id).foreach(rootLogger.addAppender)
val jobStarted = startJob(req)
val unit = ExecutionUnit(sender(), jobStarted)
context become running(unit)
Expand All @@ -101,6 +106,7 @@ class WorkerActor(
sender() ! WorkerIsBusy(id)
case resp: JobResponse =>
log.info(s"Job execution done. Returning result $resp and become awaiting new request")
rootLogger.removeAppender(resp.id)
execution.requester ! resp
context become awaitRequest()

Expand All @@ -121,11 +127,13 @@ class WorkerActor(
self ! PoisonPill
case resp: JobResponse =>
log.info(s"Job execution done. Returning result $resp and shutting down")
rootLogger.removeAppender(resp.id)
execution.requester ! resp
self ! PoisonPill
}

private def cancel(id: String, respond: ActorRef): Unit = {
rootLogger.removeAppender(id)
namedContext.sparkContext.cancelJobGroup(id)
StreamingContext.getActive().foreach( _.stop(stopSparkContext = false, stopGracefully = true))
respond ! JobIsCancelled(id)
Expand All @@ -143,14 +151,19 @@ object WorkerActor {
def props(
context: NamedContext,
artifactDownloader: ArtifactDownloader,
runnerSelector: RunnerSelector
runnerSelector: RunnerSelector,
mkAppender: String => Option[Appender]
): Props =
Props(new WorkerActor(runnerSelector, context, artifactDownloader))
Props(new WorkerActor(runnerSelector, context, artifactDownloader, mkAppender))

def props(context: NamedContext, artifactDownloader: ArtifactDownloader, runnerSelector: RunnerSelector): Props =
props(context, artifactDownloader, runnerSelector, mkAppenderF(context.loggingConf))

def props(context: NamedContext, artifactDownloader: ArtifactDownloader): Props =
props(context, artifactDownloader, new SimpleRunnerSelector)

def mkAppenderF(conf: Option[CentralLoggingConf]): String => Option[Appender] =
(id: String) => conf.map(c => RemoteAppender(id, c))

def props(
context: NamedContext,
artifactDownloader: ArtifactDownloader
): Props =
Props(new WorkerActor(new SimpleRunnerSelector, context, artifactDownloader))
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.hydrosphere.mist.worker

import io.hydrosphere.mist.api.CentralLoggingConf
import io.hydrosphere.mist.api.logging.MistLogging
import io.hydrosphere.mist.api.logging.MistLogging.{LogEvent, LogsWriter}
import io.hydrosphere.mist.core.MockitoSugar
import org.apache.log4j.spi.{LoggingEvent, NOPLogger, NOPLoggerRepository}
import org.apache.log4j.{Category, Level}
import org.mockito.Matchers.{eq => mockitoEq}
import org.mockito.Mockito._
import org.scalatest.{FunSpecLike, Matchers}

class RemoteAppenderSpec extends FunSpecLike with Matchers with MockitoSugar {


val noopLogger = new NOPLogger(new NOPLoggerRepository, "forTest")

it("should create appender with layout") {
val appender = RemoteAppender("id", CentralLoggingConf("localhost", 2005))
appender.getLayout should not be null
appender.close()
}

it("should send message to logs writer with correct level") {

val logsWriter = mock[LogsWriter]

doNothing()
.when(logsWriter).write(any[LogEvent])

val appender = RemoteAppender.create("id", logsWriter)

appender.append(mkLoggingEvent(Level.INFO, "info"))
verify(logsWriter).write(mockitoEq(LogEvent("id", "info", 1L, MistLogging.Level.Info.value, None)))

appender.append(mkLoggingEvent(Level.DEBUG, "debug"))
verify(logsWriter).write(mockitoEq(LogEvent("id", "debug", 1L, MistLogging.Level.Debug.value, None)))

appender.append(mkLoggingEvent(Level.WARN, "warn"))
verify(logsWriter).write(mockitoEq(LogEvent("id", "warn", 1L, MistLogging.Level.Warn.value, None)))

appender.append(mkErrorLoggingEvent("error", None))
verify(logsWriter).write(mockitoEq(LogEvent("id", "error", 1L, MistLogging.Level.Error.value, None)))

val error = Some(new RuntimeException("test"))
appender.append(mkErrorLoggingEvent("error", error))
verify(logsWriter).write(mockitoEq(
LogEvent.mkError("id", "error", error, 1L)
))

appender.append(mkLoggingEvent(Level.FATAL, "unknown level"))
verify(logsWriter).write(mockitoEq(LogEvent("id", "FATAL - unknown level\n", 1L, MistLogging.Level.Info.value, None)))

}

def mkLoggingEvent(level: Level, msg: String): LoggingEvent =
new LoggingEvent(classOf[Category].getName, noopLogger, 1L, level, msg, null)

def mkErrorLoggingEvent(msg: String, ex: Option[Throwable]): LoggingEvent = {
new LoggingEvent(classOf[Category].getName, noopLogger, 1L, Level.ERROR, msg, ex.orNull)
}
}
Loading

0 comments on commit f25a89b

Please sign in to comment.