diff --git a/mist-lib/src/main/scala/io/hydrosphere/mist/api/Logging.scala b/mist-lib/src/main/scala/io/hydrosphere/mist/api/Logging.scala index 384209442..8d3c01600 100644 --- a/mist-lib/src/main/scala/io/hydrosphere/mist/api/Logging.scala +++ b/mist-lib/src/main/scala/io/hydrosphere/mist/api/Logging.scala @@ -1,6 +1,6 @@ package io.hydrosphere.mist.api -import io.hydrosphere.mist.api.logging.MistLogging.{LogEvent, LogsWriter, RemoteLogsWriter, Slf4jWriter} +import io.hydrosphere.mist.api.logging.MistLogging.{LogEvent, Slf4jWriter} trait Logging extends ContextSupport { @@ -13,7 +13,7 @@ trait Logging extends ContextSupport { this.jobId = conf.info.id } - def getLogger: MLogger = new MLogger(jobId, loggingConf) + def getLogger: MLogger = new MLogger(jobId) override private[mist] def stop(): Unit = { super.stop() @@ -21,31 +21,10 @@ trait Logging extends ContextSupport { } -class MLogger( - sourceId: String, - centralConf: Option[CentralLoggingConf] -) { +class MLogger(sourceId: String) { @transient - lazy val writer = centralConf match { - case Some(conf) => new LogsWriter { - import conf._ - - val slf4j = new Slf4jWriter(sourceId) - val remote = RemoteLogsWriter.getOrCreate(host, port) - - override def write(e: LogEvent): Unit = { - slf4j.write(e) - remote.write(e) - } - - } - - case None => - val slf4j = new Slf4jWriter(sourceId) - slf4j.write(LogEvent.mkInfo(sourceId, "Central logging is not configured, using default slf4j")) - slf4j - } + lazy val writer = new Slf4jWriter(sourceId) def debug(msg: String): Unit = { val e = LogEvent.mkDebug(sourceId, msg) diff --git a/mist-lib/src/main/scala/io/hydrosphere/mist/api/logging/MistLogging.scala b/mist-lib/src/main/scala/io/hydrosphere/mist/api/logging/MistLogging.scala index e1d1ba956..8b469baf9 100644 --- a/mist-lib/src/main/scala/io/hydrosphere/mist/api/logging/MistLogging.scala +++ b/mist-lib/src/main/scala/io/hydrosphere/mist/api/logging/MistLogging.scala @@ -11,6 +11,7 @@ import akka.stream.{ActorMaterializer, OverflowStrategy} import akka.util.ByteString import com.twitter.chill.{KryoPool, ScalaKryoInstantiator} import com.typesafe.config.ConfigFactory +import io.hydrosphere.mist.api.logging.MistLogging.RemoteLogsWriter.Key import org.slf4j.{Logger, LoggerFactory} import scala.concurrent.Await @@ -79,7 +80,6 @@ private [mist] object MistLogging { writer.toString }) - LogEvent(from, message, ts, Level.Error.value, errTrace) } @@ -94,6 +94,7 @@ private [mist] object MistLogging { def write(e: LogEvent): Unit + def close(): Unit } class Slf4jWriter(sourceId: String) extends LogsWriter { @@ -106,6 +107,9 @@ private [mist] object MistLogging { case Level.Warn => sl4jLogger.warn(e.message) case Level.Error => sl4jLogger.error(e.message, e) } + + override def close(): Unit = () + } class RemoteLogsWriter(host: String, port: Int) extends LogsWriter { @@ -144,13 +148,13 @@ private [mist] object MistLogging { def close(): Unit = { mat.shutdown() + RemoteLogsWriter.remove(Key(host, port)) Await.result(sys.terminate(), Duration.Inf) } } object RemoteLogsWriter { - val systemConfig = ConfigFactory.parseString( "akka.daemonic = on" ) @@ -159,14 +163,18 @@ private [mist] object MistLogging { private val writers = new ConcurrentHashMap[Key, RemoteLogsWriter]() def getOrCreate(host: String, port: Int): RemoteLogsWriter = { + getWith(host, port)(key => new RemoteLogsWriter(key.host, key.port)) + } + + def getWith(host: String, port: Int)(createFn: Key => RemoteLogsWriter): RemoteLogsWriter = { val key = Key(host, port) writers.computeIfAbsent(key, new java.util.function.Function[Key, RemoteLogsWriter] { - override def apply(k: Key): RemoteLogsWriter = { - new RemoteLogsWriter(k.host, k.port) - } + override def apply(k: Key): RemoteLogsWriter = createFn(k) }) } + def remove(key: Key): RemoteLogsWriter = writers.remove(key) + private case class Key(host: String, port: Int) } diff --git a/mist-lib/src/main/scala/mist/api/MistExtrasDef.scala b/mist-lib/src/main/scala/mist/api/MistExtrasDef.scala index 759f2bff6..a72fe80f5 100644 --- a/mist-lib/src/main/scala/mist/api/MistExtrasDef.scala +++ b/mist-lib/src/main/scala/mist/api/MistExtrasDef.scala @@ -17,7 +17,7 @@ object MistExtras { Extracted(new MistExtras( jobId = jobId, workerId = workerId, - logger = new MLogger(jobId, ctx.setupConf.loggingConf) + logger = new MLogger(jobId) )) }) } diff --git a/mist/worker/src/main/scala/io/hydrosphere/mist/worker/MasterBridge.scala b/mist/worker/src/main/scala/io/hydrosphere/mist/worker/MasterBridge.scala index 5b24c232c..a94efd946 100644 --- a/mist/worker/src/main/scala/io/hydrosphere/mist/worker/MasterBridge.scala +++ b/mist/worker/src/main/scala/io/hydrosphere/mist/worker/MasterBridge.scala @@ -134,8 +134,8 @@ object MasterBridge { new NamedContext( sparkContext, id, - org.apache.spark.streaming.Duration(init.streamingDuration.toMillis), - Option(centralLoggingConf) + Option(centralLoggingConf), + org.apache.spark.streaming.Duration(init.streamingDuration.toMillis) ) } } diff --git a/mist/worker/src/main/scala/io/hydrosphere/mist/worker/NamedContext.scala b/mist/worker/src/main/scala/io/hydrosphere/mist/worker/NamedContext.scala index 4359f0e2b..285f5e269 100644 --- a/mist/worker/src/main/scala/io/hydrosphere/mist/worker/NamedContext.scala +++ b/mist/worker/src/main/scala/io/hydrosphere/mist/worker/NamedContext.scala @@ -3,6 +3,7 @@ package io.hydrosphere.mist.worker import java.io.File import io.hydrosphere.mist.api.{CentralLoggingConf, RuntimeJobInfo, SetupConfiguration} +import org.apache.commons.io.IOUtils import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext @@ -14,8 +15,8 @@ import scala.collection.mutable class NamedContext( val sparkContext: SparkContext, val namespace: String, - streamingDuration: Duration = Duration(40 * 1000), - loggingConf: Option[CentralLoggingConf] = None + val loggingConf: Option[CentralLoggingConf] = None, + streamingDuration: Duration = Duration(40 * 1000) ) { private val jars = mutable.Buffer.empty[String] diff --git a/mist/worker/src/main/scala/io/hydrosphere/mist/worker/RemoteAppender.scala b/mist/worker/src/main/scala/io/hydrosphere/mist/worker/RemoteAppender.scala new file mode 100644 index 000000000..2d2ebfe3a --- /dev/null +++ b/mist/worker/src/main/scala/io/hydrosphere/mist/worker/RemoteAppender.scala @@ -0,0 +1,47 @@ +package io.hydrosphere.mist.worker + +import io.hydrosphere.mist.api.CentralLoggingConf +import io.hydrosphere.mist.api.logging.MistLogging +import io.hydrosphere.mist.api.logging.MistLogging.{LogsWriter, RemoteLogsWriter} +import org.apache.log4j.spi.LoggingEvent +import org.apache.log4j.{AppenderSkeleton, Level, SimpleLayout} + +class RemoteAppender(sourceId: String, logsWriter: LogsWriter) extends AppenderSkeleton { + + override def append(event: LoggingEvent): Unit = { + + val timeStamp = event.timeStamp + val message = event.getRenderedMessage + val evt = event.getLevel match { + case Level.INFO => MistLogging.LogEvent.mkInfo(sourceId, message, timeStamp) + case Level.DEBUG => MistLogging.LogEvent.mkDebug(sourceId, message, timeStamp) + case Level.ERROR => + MistLogging.LogEvent.mkError( + sourceId, message, + Option(event.getThrowableInformation).map(_.getThrowable), + timeStamp + ) + case Level.WARN => MistLogging.LogEvent.mkWarn(sourceId, message, timeStamp) + case _ => MistLogging.LogEvent.mkInfo(sourceId, this.getLayout.format(event), timeStamp) + } + logsWriter.write(evt) + } + + override def close(): Unit = logsWriter.close() + + override def requiresLayout(): Boolean = true +} + + +object RemoteAppender { + def apply(sourceId: String, loggingConf: CentralLoggingConf): RemoteAppender = + create(sourceId, RemoteLogsWriter.getOrCreate(loggingConf.host, loggingConf.port)) + + def create(sourceId: String, logsWriter: LogsWriter): RemoteAppender = { + val jobLogsAppender = new RemoteAppender(sourceId, logsWriter) + jobLogsAppender.setLayout(new SimpleLayout) + jobLogsAppender.setName(sourceId) + jobLogsAppender + } + +} \ No newline at end of file diff --git a/mist/worker/src/main/scala/io/hydrosphere/mist/worker/WorkerActor.scala b/mist/worker/src/main/scala/io/hydrosphere/mist/worker/WorkerActor.scala index e640938e1..5bdc44520 100644 --- a/mist/worker/src/main/scala/io/hydrosphere/mist/worker/WorkerActor.scala +++ b/mist/worker/src/main/scala/io/hydrosphere/mist/worker/WorkerActor.scala @@ -3,13 +3,14 @@ package io.hydrosphere.mist.worker import java.util.concurrent.Executors import akka.actor._ +import io.hydrosphere.mist.api.CentralLoggingConf import io.hydrosphere.mist.core.CommonData._ import io.hydrosphere.mist.worker.runners._ import mist.api.data.JsLikeData +import org.apache.log4j.{Appender, LogManager} import org.apache.spark.streaming.StreamingContext import org.slf4j.LoggerFactory -import scala.collection.mutable import scala.concurrent._ import scala.util.{Failure, Success} @@ -23,6 +24,8 @@ trait JobStarting { val runnerSelector: RunnerSelector val namedContext: NamedContext val artifactDownloader: ArtifactDownloader + protected val rootLogger = LogManager.getRootLogger + protected final def startJob( req: RunJobRequest @@ -72,7 +75,8 @@ trait JobStarting { class WorkerActor( val runnerSelector: RunnerSelector, val namedContext: NamedContext, - val artifactDownloader: ArtifactDownloader + val artifactDownloader: ArtifactDownloader, + mkAppender: String => Option[Appender] ) extends Actor with JobStarting with ActorLogging { implicit val ec = { @@ -88,6 +92,7 @@ class WorkerActor( private def awaitRequest(): Receive = { case req: RunJobRequest => + mkAppender(req.id).foreach(rootLogger.addAppender) val jobStarted = startJob(req) val unit = ExecutionUnit(sender(), jobStarted) context become running(unit) @@ -101,6 +106,7 @@ class WorkerActor( sender() ! WorkerIsBusy(id) case resp: JobResponse => log.info(s"Job execution done. Returning result $resp and become awaiting new request") + rootLogger.removeAppender(resp.id) execution.requester ! resp context become awaitRequest() @@ -121,11 +127,13 @@ class WorkerActor( self ! PoisonPill case resp: JobResponse => log.info(s"Job execution done. Returning result $resp and shutting down") + rootLogger.removeAppender(resp.id) execution.requester ! resp self ! PoisonPill } private def cancel(id: String, respond: ActorRef): Unit = { + rootLogger.removeAppender(id) namedContext.sparkContext.cancelJobGroup(id) StreamingContext.getActive().foreach( _.stop(stopSparkContext = false, stopGracefully = true)) respond ! JobIsCancelled(id) @@ -143,14 +151,19 @@ object WorkerActor { def props( context: NamedContext, artifactDownloader: ArtifactDownloader, - runnerSelector: RunnerSelector + runnerSelector: RunnerSelector, + mkAppender: String => Option[Appender] ): Props = - Props(new WorkerActor(runnerSelector, context, artifactDownloader)) + Props(new WorkerActor(runnerSelector, context, artifactDownloader, mkAppender)) + + def props(context: NamedContext, artifactDownloader: ArtifactDownloader, runnerSelector: RunnerSelector): Props = + props(context, artifactDownloader, runnerSelector, mkAppenderF(context.loggingConf)) + + def props(context: NamedContext, artifactDownloader: ArtifactDownloader): Props = + props(context, artifactDownloader, new SimpleRunnerSelector) + + def mkAppenderF(conf: Option[CentralLoggingConf]): String => Option[Appender] = + (id: String) => conf.map(c => RemoteAppender(id, c)) - def props( - context: NamedContext, - artifactDownloader: ArtifactDownloader - ): Props = - Props(new WorkerActor(new SimpleRunnerSelector, context, artifactDownloader)) } diff --git a/mist/worker/src/test/scala/io/hydrosphere/mist/worker/RemoteAppenderSpec.scala b/mist/worker/src/test/scala/io/hydrosphere/mist/worker/RemoteAppenderSpec.scala new file mode 100644 index 000000000..8ea8d212b --- /dev/null +++ b/mist/worker/src/test/scala/io/hydrosphere/mist/worker/RemoteAppenderSpec.scala @@ -0,0 +1,62 @@ +package io.hydrosphere.mist.worker + +import io.hydrosphere.mist.api.CentralLoggingConf +import io.hydrosphere.mist.api.logging.MistLogging +import io.hydrosphere.mist.api.logging.MistLogging.{LogEvent, LogsWriter} +import io.hydrosphere.mist.core.MockitoSugar +import org.apache.log4j.spi.{LoggingEvent, NOPLogger, NOPLoggerRepository} +import org.apache.log4j.{Category, Level} +import org.mockito.Matchers.{eq => mockitoEq} +import org.mockito.Mockito._ +import org.scalatest.{FunSpecLike, Matchers} + +class RemoteAppenderSpec extends FunSpecLike with Matchers with MockitoSugar { + + + val noopLogger = new NOPLogger(new NOPLoggerRepository, "forTest") + + it("should create appender with layout") { + val appender = RemoteAppender("id", CentralLoggingConf("localhost", 2005)) + appender.getLayout should not be null + appender.close() + } + + it("should send message to logs writer with correct level") { + + val logsWriter = mock[LogsWriter] + + doNothing() + .when(logsWriter).write(any[LogEvent]) + + val appender = RemoteAppender.create("id", logsWriter) + + appender.append(mkLoggingEvent(Level.INFO, "info")) + verify(logsWriter).write(mockitoEq(LogEvent("id", "info", 1L, MistLogging.Level.Info.value, None))) + + appender.append(mkLoggingEvent(Level.DEBUG, "debug")) + verify(logsWriter).write(mockitoEq(LogEvent("id", "debug", 1L, MistLogging.Level.Debug.value, None))) + + appender.append(mkLoggingEvent(Level.WARN, "warn")) + verify(logsWriter).write(mockitoEq(LogEvent("id", "warn", 1L, MistLogging.Level.Warn.value, None))) + + appender.append(mkErrorLoggingEvent("error", None)) + verify(logsWriter).write(mockitoEq(LogEvent("id", "error", 1L, MistLogging.Level.Error.value, None))) + + val error = Some(new RuntimeException("test")) + appender.append(mkErrorLoggingEvent("error", error)) + verify(logsWriter).write(mockitoEq( + LogEvent.mkError("id", "error", error, 1L) + )) + + appender.append(mkLoggingEvent(Level.FATAL, "unknown level")) + verify(logsWriter).write(mockitoEq(LogEvent("id", "FATAL - unknown level\n", 1L, MistLogging.Level.Info.value, None))) + + } + + def mkLoggingEvent(level: Level, msg: String): LoggingEvent = + new LoggingEvent(classOf[Category].getName, noopLogger, 1L, level, msg, null) + + def mkErrorLoggingEvent(msg: String, ex: Option[Throwable]): LoggingEvent = { + new LoggingEvent(classOf[Category].getName, noopLogger, 1L, Level.ERROR, msg, ex.orNull) + } +} diff --git a/mist/worker/src/test/scala/io/hydrosphere/mist/worker/WorkerActorSpec.scala b/mist/worker/src/test/scala/io/hydrosphere/mist/worker/WorkerActorSpec.scala index 9ed9ef7b5..6b1b0ce38 100644 --- a/mist/worker/src/test/scala/io/hydrosphere/mist/worker/WorkerActorSpec.scala +++ b/mist/worker/src/test/scala/io/hydrosphere/mist/worker/WorkerActorSpec.scala @@ -4,13 +4,18 @@ import java.io.File import akka.actor.{ActorRef, ActorSystem, Props} import akka.testkit.{TestActorRef, TestKit, TestProbe} +import io.hydrosphere.mist.api.CentralLoggingConf import io.hydrosphere.mist.core.CommonData._ import io.hydrosphere.mist.core.MockitoSugar import io.hydrosphere.mist.worker.runners.{ArtifactDownloader, JobRunner, RunnerSelector} import mist.api.data.{JsLikeData, _} +import org.apache.log4j.LogManager import org.apache.spark.{SparkConf, SparkContext} import org.scalatest._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Promise} + class WorkerActorSpec extends TestKit(ActorSystem("WorkerSpec")) with FunSpecLike with Matchers @@ -196,6 +201,57 @@ class WorkerActorSpec extends TestKit(ActorSystem("WorkerSpec")) probe.send(worker, ForceShutdown) probe.expectTerminated(worker) } + describe("logging") { + val artifactDownloader = mock[ArtifactDownloader] + + when(artifactDownloader.downloadArtifact(any[String])) + .thenSuccess(SparkArtifact(new File("doesn't matter"), "url")) + + it("should add and remove appender") { + val completion = Promise[JsLikeData] + val runner = SuccessRunnerSelector(Await.result(completion.future, Duration.Inf)) + val props = WorkerActor.props(context, artifactDownloader, runner, WorkerActor.mkAppenderF(Some(CentralLoggingConf("localhost", 2005)))) + + val worker = TestActorRef[WorkerActor](props) + val probe = TestProbe() + probe.send(worker, RunJobRequest("id", JobParams("path", "MyClass", Map.empty, action = Action.Execute))) + val appender = LogManager.getRootLogger.getAppender("id") + appender should not be null + completion.success(JsLikeNumber(42)) + probe.expectMsgAllConformingOf( + classOf[JobFileDownloading], + classOf[JobStarted], + classOf[JobResponse] + ) + val appender1 = LogManager.getRootLogger.getAppender("id") + appender1 shouldBe null + } + + it("should remove appender when cancelling job") { + val completion = Promise[JsLikeData] + val runner = SuccessRunnerSelector(Await.result(completion.future, Duration.Inf)) + val props = WorkerActor.props(context, artifactDownloader, runner, WorkerActor.mkAppenderF(Some(CentralLoggingConf("localhost", 2005)))) + + val worker = TestActorRef[WorkerActor](props) + val probe = TestProbe() + probe.send(worker, RunJobRequest("id", JobParams("path", "MyClass", Map.empty, action = Action.Execute))) + + val appender = LogManager.getRootLogger.getAppender("id") + appender should not be null + + probe.expectMsgAllConformingOf( + classOf[JobFileDownloading], + classOf[JobStarted] + ) + + probe.send(worker, CancelJobRequest("id")) + probe.expectMsgType[JobIsCancelled] + + val appender1 = LogManager.getRootLogger.getAppender("id") + appender1 shouldBe null + completion.success(JsLikeNull) + } + } def RunnerSelector(r: JobRunner): RunnerSelector = new RunnerSelector {