Commit 77dbb32a authored by 赵威's avatar 赵威

add dingtalk msg

parent 19221c62
...@@ -40,6 +40,8 @@ lazy val root = (project in file(".")) ...@@ -40,6 +40,8 @@ lazy val root = (project in file("."))
libraryDependencies += "net.debasishg" %% "redisclient" % "3.30", libraryDependencies += "net.debasishg" %% "redisclient" % "3.30",
libraryDependencies += "com.lihaoyi" %% "requests" % "0.1.9",
libraryDependencies += flinkCore, libraryDependencies += flinkCore,
libraryDependencies += flinkScala, libraryDependencies += flinkScala,
libraryDependencies += flinkStreamingScala, libraryDependencies += flinkStreamingScala,
......
...@@ -12,7 +12,7 @@ import org.apache.flink.streaming.connectors.kafka.{ ...@@ -12,7 +12,7 @@ import org.apache.flink.streaming.connectors.kafka.{
import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types } import org.apache.flink.api.common.typeinfo.{ TypeHint, TypeInformation, Types }
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import com.alibaba.fastjson.JSON import com.alibaba.fastjson.JSON
import com.gmei.up.utils.{ UserInfo, ES, Redis } import com.gmei.up.utils.{ UserInfo, ES, Redis, DingTalk }
class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] { class UserInfoDeserializationSchema extends KafkaDeserializationSchema[UserInfo] {
override def isEndOfStream(t: UserInfo): Boolean = false override def isEndOfStream(t: UserInfo): Boolean = false
...@@ -34,8 +34,8 @@ object Main extends LazyLogging { ...@@ -34,8 +34,8 @@ object Main extends LazyLogging {
// val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1) // val query = ES.generateServiceDiaryRequest(List("光子嫩肤", "水光针"), List("丰胸", "胸部塑身"), -1)
// println("###############") // println("###############")
try { try {
val env = StreamExecutionEnvironment.getExecutionEnvironment
logger.info("UserPortrait Flink Started") logger.info("UserPortrait Flink Started")
val env = StreamExecutionEnvironment.getExecutionEnvironment
// // TODO read from config // // TODO read from config
val kafkaConsumerProperties = new Properties val kafkaConsumerProperties = new Properties
...@@ -120,6 +120,10 @@ object Main extends LazyLogging { ...@@ -120,6 +120,10 @@ object Main extends LazyLogging {
} catch { } catch {
case e: Throwable => case e: Throwable =>
DingTalk.send(
Map("method" -> "main", "error" -> e.getStackTrace.mkString("\n")),
contentType = "exception"
)
e.printStackTrace() e.printStackTrace()
} }
} }
package com.gmei.up.utils
import scala.collection.mutable.StringBuilder
import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec
import java.net.URLEncoder
import java.util.Base64
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
object DingTalk {
def send(
msg: Map[String, String],
contentType: String = "msg",
mobiles: List[String] = List.empty[String]
): Unit = {
val url = mkUrl(contentType)
val builder = StringBuilder.newBuilder
builder.append("StreamingUserPortrait:\n")
for ((k, v) <- msg) {
builder.append(k)
builder.append(": ")
builder.append(v)
builder.append("\n")
}
makeRequest(builder.toString, contentType)
}
def makeRequest(msg: String, contentType: String): Unit =
try {
val url = mkUrl(contentType)
val data = ("msgtype" -> "text") ~ ("text" -> ("content" -> msg))
val s = requests.Session(
headers = Map("Content-Type" -> "application/json")
)
s.post(url, data = compact(render(data)))
} catch {
case e: Throwable =>
e.printStackTrace()
}
def mkUrl(contentType: String = "msg"): String = {
var secret = "SECa194492f061e5b422452baee55b45f95c68bcc9fdbfa0439e0fa3f36d25d8840"
var token = "c39b1c5344558f46a8d1450fafc20366e64434ec58578a94df12c0f4b03dbad8"
if (contentType != "msg") {
secret = "SECc404abeefb4985e4ad1e3189bfbb3932b0005109fb5ecddc1f1f1526a1996f9c"
token = "c90b88ddf6be2398eb72429ee897a24fe3e80e85b1a896a7e5e091baf39344ca"
}
val r = requests.get("https://api.github.com/users/lihaoyi")
val timestamp = System.currentTimeMillis()
val stringToSign = timestamp + "\n" + secret
val mac = Mac.getInstance("HmacSHA256")
mac.init(new SecretKeySpec(secret.getBytes("UTF-8"), "HmacSHA256"))
val signData = mac.doFinal(stringToSign.getBytes("UTF-8"))
val encoder = Base64.getEncoder()
val sign = URLEncoder.encode(new String(encoder.encodeToString(signData)), "UTF-8")
s"https://oapi.dingtalk.com/robot/send?access_token=${token}&sign=${sign}&timestamp=${timestamp}"
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment