Commit 6055fdc6 authored by 高雅喆's avatar 高雅喆

add node2vec

parent b819e27c
MIT License
Copyright (c) 2016 Aditya Grover
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# node2vec
This repository provides a reference implementation of *node2vec* as described in the paper:<br>
> node2vec: Scalable Feature Learning for Networks.<br>
> Aditya Grover and Jure Leskovec.<br>
> Knowledge Discovery and Data Mining, 2016.<br>
> <Insert paper link>
The *node2vec* algorithm learns continuous representations for nodes in any (un)directed, (un)weighted graph. Please check the [project page](https://snap.stanford.edu/node2vec/) for more details.
### Basic Usage
#### Example
To run *node2vec* on Zachary's karate club network, execute the following command from the project home directory:<br/>
``python src/main.py --input graph/karate.edgelist --output emb/karate.emd``
#### Options
You can check out the other options available to use with *node2vec* using:<br/>
``python src/main.py --help``
#### Input
The supported input format is an edgelist:
node1_id_int node2_id_int <weight_float, optional>
The graph is assumed to be undirected and unweighted by default. These options can be changed by setting the appropriate flags.
#### Output
The output file has *n+1* lines for a graph with *n* vertices.
The first line has the following format:
num_of_nodes dim_of_representation
The next *n* lines are as follows:
node_id dim1 dim2 ... dimd
where dim1, ... , dimd is the *d*-dimensional representation learned by *node2vec*.
### Citing
If you find *node2vec* useful for your research, please consider citing the following paper:
@inproceedings{node2vec-kdd2016,
author = {Grover, Aditya and Leskovec, Jure},
title = {node2vec: Scalable Feature Learning for Networks},
booktitle = {Proceedings of the 22nd ACM SIGKDD International Conference on Knowledge Discovery and Data Mining},
year = {2016}
}
### Miscellaneous
Please send any questions you might have about the code and/or the algorithm to <adityag@cs.stanford.edu>.
*Note:* This is only a reference implementation of the *node2vec* algorithm and could benefit from several performance enhancement schemes, some of which are discussed in the paper.
This diff is collapsed.
1 32
1 22
1 20
1 18
1 14
1 13
1 12
1 11
1 9
1 8
1 7
1 6
1 5
1 4
1 3
1 2
2 31
2 22
2 20
2 18
2 14
2 8
2 4
2 3
3 14
3 9
3 10
3 33
3 29
3 28
3 8
3 4
4 14
4 13
4 8
5 11
5 7
6 17
6 11
6 7
7 17
9 34
9 33
9 33
10 34
14 34
15 34
15 33
16 34
16 33
19 34
19 33
20 34
21 34
21 33
23 34
23 33
24 30
24 34
24 33
24 28
24 26
25 32
25 28
25 26
26 32
27 34
27 30
28 34
29 34
29 32
30 34
30 33
31 34
31 33
32 34
32 33
33 34
\ No newline at end of file
# node2vec on spark
This library is a implementation using scala for running on spark of *node2vec* as described in the paper:
> node2vec: Scalable Feature Learning for Networks.
> Aditya Grover and Jure Leskovec.
> Knowledge Discovery and Data Mining, 2016.
> <Insert paper link>
The *node2vec* algorithm learns continuous representations for nodes in any (un)directed, (un)weighted graph. Please check the [project page](https://snap.stanford.edu/node2vec/) for more details.
### Building node2vec_spark
**In order to build node2vec_spark, use the following:**
```
$ git clone https://github.com/Skarface-/node2vec.git
$ mvn clean package
```
**and requires:**<br/>
Maven 3.0.5 or newer<br/>
Java 7+<br/>
Scala 2.10 or newer.
This will produce jar file in "node2vec_spark/target/"
### Examples
This library has two functions: *randomwalk* and *embedding*. <br/>
These were described in these papers [node2vec: Scalable Feature Learning for Networks](http://arxiv.org/abs/1607.00653) and [Efficient Estimation of Word Representations in Vector Space](https://arxiv.org/abs/1301.3781).
### Random walk
Example:
./spark-submit --class com.navercorp.Main \
./node2vec_spark/target/node2vec-0.0.1-SNAPSHOT.jar \
--cmd randomwalk --p 100.0 --q 100.0 --walkLength 40 \
--input <input> --output <output>
#### Options
Invoke a command without arguments to list available arguments and their default values:
```
--cmd COMMAND
Functions: randomwalk or embedding. If you want to execute all functions "randomwalk" and "embedding" sequentially input "node2vec". Default "node2vec"
--input [INPUT]
Input edgelist path. The supported input format is an edgelist: "node1_id_int node2_id_int <weight_float, optional>"
--output [OUTPUT]
Random paths path.
--walkLength WALK_LENGTH
Length of walk per source. Default is 80.
--numWalks NUM_WALKS
Number of walks per source. Default is 10.
--p P
Return hyperparaemter. Default is 1.0.
--q Q
Inout hyperparameter. Default is 1.0.
--weighted Boolean
Specifying (un)weighted. Default is true.
--directed Boolean
Specifying (un)directed. Default is false.
--degree UPPER_BOUND_OF_NUMBER_OF_NEIGHBORS
Specifying upper bound of number of neighbors. Default is 30.
--indexed Boolean
Specifying whether nodes in edgelist are indexed or not. Default is true.
```
* If "indexed" is set to false, *node2vec_spark* index nodes in input edgelist, example: <br/>
**unindexed edgelist:**<br/>
node1 node2 1.0<br/>
node2 node7 1.0<br/>
**indexed:**<br/>
1 2 1.0<br/>
2 3 1.0<br/>
1 node1<br/>
2 node2<br/>
3 node7
#### Input
The supported input format is an edgelist:
node1_id_int node2_id_int <weight_float, optional>
or
node1_str node2_str <weight_float, optional>, Please set the option "indexed" to false
#### Output
The output file (number of nodes)*numWalks random paths as follows:
src_node_id_int node1_id_int node2_id_int ... noden_id_int
### Embedding random paths
Example:
./spark-submit --class com.navercorp.Main \
./node2vec_spark/target/node2vec-0.0.1-SNAPSHOT.jar \
--cmd embedding --dim 50 --iter 20 \
--input <input> --nodePath <node2id_path> --output <output>
#### Options
Invoke a command without arguments to list available arguments and their default values:
```
--cmd COMMAND
embedding. If you want to execute sequentially all functions: "randomwalk" and "embedding", input "node2vec". default "node2vec"
--input [INPUT]
Input random paths. The supported input format is an random paths: "src_node_id_int node1_id_int ... noden_id_int"
--output [OUTPUT]
word2vec model(.bin) and embeddings(.emb).
--nodePath [NODE\_PATH]
Input node2index path. The supported input format: "node1_str node1_id_int"
--iter ITERATION
Number of epochs in SGD. Default 10.
--dim DIMENSION
Number of dimensions. Default is 128.
--window WINDOW_SIZE
Context size for optimization. Default is 10.
```
#### Input
The supported input format is an random paths:
src_node_id_int node1_id_int ... noden_id_int
#### Output
The output files are **embeddings and word2vec model.** The embeddings file has the following format:
node1_str dim1 dim2 ... dimd
where dim1, ... , dimd is the d-dimensional representation learned by word2vec.
the output file *word2vec model* has the spark word2vec model format. please reference to https://spark.apache.org/docs/1.5.2/mllib-feature-extraction.html#word2vec
## References
1. [node2vec: Scalable Feature Learning for Networks](http://arxiv.org/abs/1607.00653)
2. [Efficient Estimation of Word Representations in Vector Space](https://arxiv.org/abs/1301.3781)
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.navercorp</groupId>
<artifactId>node2vec</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<name>node2vec_spark</name>
<url>http://snap.stanford.edu/node2vec/</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<shadedClassifier>bin</shadedClassifier>
<maven-shade-plugin.version>2.4.3</maven-shade-plugin.version>
<exec-maven-plugin.version>1.4.0</exec-maven-plugin.version>
<java.version>1.7</java.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>false</skip>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.binary.version}.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.binary.version}</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_${scala.binary.version}</artifactId>
<version>3.3.0</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
</dependencies>
</project>
appender.out.type = Console
appender.out.name = out
appender.out.layout.type = PatternLayout
appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
logger.springframework.name = org.springframework
logger.springframework.level = WARN
rootLogger.level = INFO
rootLogger.appenderRef.out.ref = out
package com.navercorp
import java.io.Serializable
import org.apache.spark.{SparkContext, SparkConf}
import scopt.OptionParser
import com.navercorp.lib.AbstractParams
object Main {
object Command extends Enumeration {
type Command = Value
val node2vec, randomwalk, embedding = Value
}
import Command._
case class Params(iter: Int = 10,
lr: Double = 0.025,
numPartition: Int = 10,
dim: Int = 128,
window: Int = 10,
walkLength: Int = 80,
numWalks: Int = 10,
p: Double = 1.0,
q: Double = 1.0,
weighted: Boolean = true,
directed: Boolean = false,
degree: Int = 30,
indexed: Boolean = false,
nodePath: String = null,
input: String = null,
output: String = null,
cmd: Command = Command.node2vec) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Node2Vec_Spark") {
head("Main")
opt[Int]("walkLength")
.text(s"walkLength: ${defaultParams.walkLength}")
.action((x, c) => c.copy(walkLength = x))
opt[Int]("numWalks")
.text(s"numWalks: ${defaultParams.numWalks}")
.action((x, c) => c.copy(numWalks = x))
opt[Double]("p")
.text(s"return parameter p: ${defaultParams.p}")
.action((x, c) => c.copy(p = x))
opt[Double]("q")
.text(s"in-out parameter q: ${defaultParams.q}")
.action((x, c) => c.copy(q = x))
opt[Boolean]("weighted")
.text(s"weighted: ${defaultParams.weighted}")
.action((x, c) => c.copy(weighted = x))
opt[Boolean]("directed")
.text(s"directed: ${defaultParams.directed}")
.action((x, c) => c.copy(directed = x))
opt[Int]("degree")
.text(s"degree: ${defaultParams.degree}")
.action((x, c) => c.copy(degree = x))
opt[Boolean]("indexed")
.text(s"Whether nodes are indexed or not: ${defaultParams.indexed}")
.action((x, c) => c.copy(indexed = x))
opt[String]("nodePath")
.text("Input node2index file path: empty")
.action((x, c) => c.copy(nodePath = x))
opt[String]("input")
.required()
.text("Input edge file path: empty")
.action((x, c) => c.copy(input = x))
opt[String]("output")
.required()
.text("Output path: empty")
.action((x, c) => c.copy(output = x))
opt[String]("cmd")
.required()
.text(s"command: ${defaultParams.cmd.toString}")
.action((x, c) => c.copy(cmd = Command.withName(x)))
note(
"""
|For example, the following command runs this app on a synthetic dataset:
|
| bin/spark-submit --class com.nhn.sunny.vegapunk.ml.model.Node2vec \
""".stripMargin +
s"| --lr ${defaultParams.lr}" +
s"| --iter ${defaultParams.iter}" +
s"| --numPartition ${defaultParams.numPartition}" +
s"| --dim ${defaultParams.dim}" +
s"| --window ${defaultParams.window}" +
s"| --input <path>" +
s"| --node <nodeFilePath>" +
s"| --output <path>"
)
}
def main(args: Array[String]) = {
parser.parse(args, defaultParams).map { param =>
val conf = new SparkConf().setAppName("Node2Vec")
val context: SparkContext = new SparkContext(conf)
Node2vec.setup(context, param)
param.cmd match {
case Command.node2vec => Node2vec.load()
.initTransitionProb()
.randomWalk()
.embedding()
.save()
case Command.randomwalk => Node2vec.load()
.initTransitionProb()
.randomWalk()
.saveRandomPath()
case Command.embedding => {
val randomPaths = Word2vec.setup(context, param).read(param.input)
Word2vec.fit(randomPaths).save(param.output)
Node2vec.loadNode2Id(param.nodePath).saveVectors()
}
}
} getOrElse {
sys.exit(1)
}
}
}
package com.navercorp
import java.io.Serializable
import scala.util.Try
import scala.collection.mutable.ArrayBuffer
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.{EdgeTriplet, Graph, _}
import com.navercorp.graph.{GraphOps, EdgeAttr, NodeAttr}
object Node2vec extends Serializable {
lazy val logger: Logger = LoggerFactory.getLogger(getClass.getName);
var context: SparkContext = null
var config: Main.Params = null
var node2id: RDD[(String, Long)] = null
var indexedEdges: RDD[Edge[EdgeAttr]] = _
var indexedNodes: RDD[(VertexId, NodeAttr)] = _
var graph: Graph[NodeAttr, EdgeAttr] = _
var randomWalkPaths: RDD[(Long, ArrayBuffer[Long])] = null
def setup(context: SparkContext, param: Main.Params): this.type = {
this.context = context
this.config = param
this
}
def load(): this.type = {
val bcMaxDegree = context.broadcast(config.degree)
val bcEdgeCreator = config.directed match {
case true => context.broadcast(GraphOps.createDirectedEdge)
case false => context.broadcast(GraphOps.createUndirectedEdge)
}
val inputTriplets: RDD[(Long, Long, Double)] = config.indexed match {
case true => readIndexedGraph(config.input)
case false => indexingGraph(config.input)
}
indexedNodes = inputTriplets.flatMap { case (srcId, dstId, weight) =>
bcEdgeCreator.value.apply(srcId, dstId, weight)
}.reduceByKey(_++_).map { case (nodeId, neighbors: Array[(VertexId, Double)]) =>
var neighbors_ = neighbors
if (neighbors_.length > bcMaxDegree.value) {
neighbors_ = neighbors.sortWith{ case (left, right) => left._2 > right._2 }.slice(0, bcMaxDegree.value)
}
(nodeId, NodeAttr(neighbors = neighbors_.distinct))
}.repartition(200).cache
indexedEdges = indexedNodes.flatMap { case (srcId, clickNode) =>
clickNode.neighbors.map { case (dstId, weight) =>
Edge(srcId, dstId, EdgeAttr())
}
}.repartition(200).cache
this
}
def initTransitionProb(): this.type = {
val bcP = context.broadcast(config.p)
val bcQ = context.broadcast(config.q)
graph = Graph(indexedNodes, indexedEdges)
.mapVertices[NodeAttr] { case (vertexId, clickNode) =>
val (j, q) = GraphOps.setupAlias(clickNode.neighbors)
val nextNodeIndex = GraphOps.drawAlias(j, q)
clickNode.path = Array(vertexId, clickNode.neighbors(nextNodeIndex)._1)
clickNode
}
.mapTriplets { edgeTriplet: EdgeTriplet[NodeAttr, EdgeAttr] =>
val (j, q) = GraphOps.setupEdgeAlias(bcP.value, bcQ.value)(edgeTriplet.srcId, edgeTriplet.srcAttr.neighbors, edgeTriplet.dstAttr.neighbors)
edgeTriplet.attr.J = j
edgeTriplet.attr.q = q
edgeTriplet.attr.dstNeighbors = edgeTriplet.dstAttr.neighbors.map(_._1)
edgeTriplet.attr
}.cache
this
}
def randomWalk(): this.type = {
val edge2attr = graph.triplets.map { edgeTriplet =>
(s"${edgeTriplet.srcId}${edgeTriplet.dstId}", edgeTriplet.attr)
}.repartition(200).cache
edge2attr.first
for (iter <- 0 until config.numWalks) {
var prevWalk: RDD[(Long, ArrayBuffer[Long])] = null
var randomWalk = graph.vertices.map { case (nodeId, clickNode) =>
val pathBuffer = new ArrayBuffer[Long]()
pathBuffer.append(clickNode.path:_*)
(nodeId, pathBuffer)
}.cache
var activeWalks = randomWalk.first
graph.unpersist(blocking = false)
graph.edges.unpersist(blocking = false)
for (walkCount <- 0 until config.walkLength) {
prevWalk = randomWalk
randomWalk = randomWalk.map { case (srcNodeId, pathBuffer) =>
val prevNodeId = pathBuffer(pathBuffer.length - 2)
val currentNodeId = pathBuffer.last
(s"$prevNodeId$currentNodeId", (srcNodeId, pathBuffer))
}.join(edge2attr).map { case (edge, ((srcNodeId, pathBuffer), attr)) =>
try {
val nextNodeIndex = GraphOps.drawAlias(attr.J, attr.q)
val nextNodeId = attr.dstNeighbors(nextNodeIndex)
pathBuffer.append(nextNodeId)
(srcNodeId, pathBuffer)
} catch {
case e: Exception => throw new RuntimeException(e.getMessage)
}
}.cache
activeWalks = randomWalk.first()
prevWalk.unpersist(blocking=false)
}
if (randomWalkPaths != null) {
val prevRandomWalkPaths = randomWalkPaths
randomWalkPaths = randomWalkPaths.union(randomWalk).cache()
randomWalkPaths.first
prevRandomWalkPaths.unpersist(blocking = false)
} else {
randomWalkPaths = randomWalk
}
}
this
}
def embedding(): this.type = {
val randomPaths = randomWalkPaths.map { case (vertexId, pathBuffer) =>
Try(pathBuffer.map(_.toString).toIterable).getOrElse(null)
}.filter(_!=null)
Word2vec.setup(context, config).fit(randomPaths)
this
}
def save(): this.type = {
this.saveRandomPath()
.saveModel()
.saveVectors()
}
def saveRandomPath(): this.type = {
randomWalkPaths
.map { case (vertexId, pathBuffer) =>
Try(pathBuffer.mkString("\t")).getOrElse(null)
}
.filter(x => x != null && x.replaceAll("\\s", "").length > 0)
.repartition(200)
.saveAsTextFile(config.output)
this
}
def saveModel(): this.type = {
Word2vec.save(config.output)
this
}
def saveVectors(): this.type = {
val node2vector = context.parallelize(Word2vec.getVectors.toList)
.map { case (nodeId, vector) =>
(nodeId.toLong, vector.mkString(","))
}
if (this.node2id != null) {
val id2Node = this.node2id.map{ case (strNode, index) =>
(index, strNode)
}
node2vector.join(id2Node)
.map { case (nodeId, (vector, name)) => s"$name\t$vector" }
.repartition(200)
.saveAsTextFile(s"${config.output}.emb")
} else {
node2vector.map { case (nodeId, vector) => s"$nodeId\t$vector" }
.repartition(200)
.saveAsTextFile(s"${config.output}.emb")
}
this
}
def cleanup(): this.type = {
node2id.unpersist(blocking = false)
indexedEdges.unpersist(blocking = false)
indexedNodes.unpersist(blocking = false)
graph.unpersist(blocking = false)
randomWalkPaths.unpersist(blocking = false)
this
}
def loadNode2Id(node2idPath: String): this.type = {
try {
this.node2id = context.textFile(config.nodePath).map { node2index =>
val Array(strNode, index) = node2index.split("\\s")
(strNode, index.toLong)
}
} catch {
case e: Exception => logger.info("Failed to read node2index file.")
this.node2id = null
}
this
}
def readIndexedGraph(tripletPath: String) = {
val bcWeighted = context.broadcast(config.weighted)
val rawTriplets = context.textFile(tripletPath)
if (config.nodePath == null) {
this.node2id = createNode2Id(rawTriplets.map { triplet =>
val parts = triplet.split("\\s")
(parts.head, parts(1), -1)
})
} else {
loadNode2Id(config.nodePath)
}
rawTriplets.map { triplet =>
val parts = triplet.split("\\s")
val weight = bcWeighted.value match {
case true => Try(parts.last.toDouble).getOrElse(1.0)
case false => 1.0
}
(parts.head.toLong, parts(1).toLong, weight)
}
}
def indexingGraph(rawTripletPath: String): RDD[(Long, Long, Double)] = {
val rawEdges = context.textFile(rawTripletPath).map { triplet =>
val parts = triplet.split("\\s")
Try {
(parts.head, parts(1), Try(parts.last.toDouble).getOrElse(1.0))
}.getOrElse(null)
}.filter(_!=null)
this.node2id = createNode2Id(rawEdges)
rawEdges.map { case (src, dst, weight) =>
(src, (dst, weight))
}.join(node2id).map { case (src, (edge: (String, Double), srcIndex: Long)) =>
try {
val (dst: String, weight: Double) = edge
(dst, (srcIndex, weight))
} catch {
case e: Exception => null
}
}.filter(_!=null).join(node2id).map { case (dst, (edge: (Long, Double), dstIndex: Long)) =>
try {
val (srcIndex, weight) = edge
(srcIndex, dstIndex, weight)
} catch {
case e: Exception => null
}
}.filter(_!=null)
}
def createNode2Id[T <: Any](triplets: RDD[(String, String, T)]) = triplets.flatMap { case (src, dst, weight) =>
Try(Array(src, dst)).getOrElse(Array.empty[String])
}.distinct().zipWithIndex()
}
package com.navercorp
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import org.apache.spark.rdd.RDD
object Word2vec extends Serializable {
var context: SparkContext = null
var word2vec = new Word2Vec()
var model: Word2VecModel = null
def setup(context: SparkContext, param: Main.Params): this.type = {
this.context = context
/**
* model = sg
* update = hs
*/
word2vec.setLearningRate(param.lr)
.setNumIterations(param.iter)
.setNumPartitions(param.numPartition)
.setMinCount(0)
.setVectorSize(param.dim)
val word2vecWindowField = word2vec.getClass.getDeclaredField("org$apache$spark$mllib$feature$Word2Vec$$window")
word2vecWindowField.setAccessible(true)
word2vecWindowField.setInt(word2vec, param.window)
this
}
def read(path: String): RDD[Iterable[String]] = {
context.textFile(path).repartition(200).map(_.split("\\s").toSeq)
}
def fit(input: RDD[Iterable[String]]): this.type = {
model = word2vec.fit(input)
this
}
def save(outputPath: String): this.type = {
model.save(context, s"$outputPath.bin")
this
}
def load(path: String): this.type = {
model = Word2VecModel.load(context, path)
this
}
def getVectors = this.model.getVectors
}
package com.navercorp.graph
import scala.collection.mutable.ArrayBuffer
object GraphOps {
def setupAlias(nodeWeights: Array[(Long, Double)]): (Array[Int], Array[Double]) = {
val K = nodeWeights.length
val J = Array.fill(K)(0)
val q = Array.fill(K)(0.0)
val smaller = new ArrayBuffer[Int]()
val larger = new ArrayBuffer[Int]()
val sum = nodeWeights.map(_._2).sum
nodeWeights.zipWithIndex.foreach { case ((nodeId, weight), i) =>
q(i) = K * weight / sum
if (q(i) < 1.0) {
smaller.append(i)
} else {
larger.append(i)
}
}
while (smaller.nonEmpty && larger.nonEmpty) {
val small = smaller.remove(smaller.length - 1)
val large = larger.remove(larger.length - 1)
J(small) = large
q(large) = q(large) + q(small) - 1.0
if (q(large) < 1.0) smaller.append(large)
else larger.append(large)
}
(J, q)
}
def setupEdgeAlias(p: Double = 1.0, q: Double = 1.0)(srcId: Long, srcNeighbors: Array[(Long, Double)], dstNeighbors: Array[(Long, Double)]): (Array[Int], Array[Double]) = {
val neighbors_ = dstNeighbors.map { case (dstNeighborId, weight) =>
var unnormProb = weight / q
if (srcId == dstNeighborId) unnormProb = weight / p
else if (srcNeighbors.exists(_._1 == dstNeighborId)) unnormProb = weight
(dstNeighborId, unnormProb)
}
setupAlias(neighbors_)
}
def drawAlias(J: Array[Int], q: Array[Double]): Int = {
val K = J.length
val kk = math.floor(math.random * K).toInt
if (math.random < q(kk)) kk
else J(kk)
}
lazy val createUndirectedEdge = (srcId: Long, dstId: Long, weight: Double) => {
Array(
(srcId, Array((dstId, weight))),
(dstId, Array((srcId, weight)))
)
}
lazy val createDirectedEdge = (srcId: Long, dstId: Long, weight: Double) => {
Array(
(srcId, Array((dstId, weight)))
)
}
}
package com.navercorp
import java.io.Serializable
package object graph {
case class NodeAttr(var neighbors: Array[(Long, Double)] = Array.empty[(Long, Double)],
var path: Array[Long] = Array.empty[Long]) extends Serializable
case class EdgeAttr(var dstNeighbors: Array[Long] = Array.empty[Long],
var J: Array[Int] = Array.empty[Int],
var q: Array[Double] = Array.empty[Double]) extends Serializable
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.lib
import scala.reflect.runtime.universe._
/**
* Abstract class for parameter case classes.
* This overrides the [[toString]] method to print all case class fields by name and value.
* @tparam T Concrete parameter class.
*/
abstract class AbstractParams[T: TypeTag] {
private def tag: TypeTag[T] = typeTag[T]
/**
* Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
* {
* [field name]:\t[field value]\n
* [field name]:\t[field value]\n
* ...
* }
*/
override def toString: String = {
val tpe = tag.tpe
val allAccessors = tpe.declarations.collect {
case m: MethodSymbol if m.isCaseAccessor => m
}
val mirror = runtimeMirror(getClass.getClassLoader)
val instanceMirror = mirror.reflect(this)
allAccessors.map { f =>
val paramName = f.name.toString
val fieldMirror = instanceMirror.reflectField(f)
val paramValue = fieldMirror.get
s" $paramName:\t$paramValue"
}.mkString("{\n", ",\n", "\n}")
}
}
\ No newline at end of file
'''
Reference implementation of node2vec.
Author: Aditya Grover
For more details, refer to the paper:
node2vec: Scalable Feature Learning for Networks
Aditya Grover and Jure Leskovec
Knowledge Discovery and Data Mining (KDD), 2016
'''
import argparse
import numpy as np
import networkx as nx
import node2vec
from gensim.models import Word2Vec
def parse_args():
'''
Parses the node2vec arguments.
'''
parser = argparse.ArgumentParser(description="Run node2vec.")
parser.add_argument('--input', nargs='?', default='graph/karate.edgelist',
help='Input graph path')
parser.add_argument('--output', nargs='?', default='emb/karate.emb',
help='Embeddings path')
parser.add_argument('--dimensions', type=int, default=128,
help='Number of dimensions. Default is 128.')
parser.add_argument('--walk-length', type=int, default=80,
help='Length of walk per source. Default is 80.')
parser.add_argument('--num-walks', type=int, default=10,
help='Number of walks per source. Default is 10.')
parser.add_argument('--window-size', type=int, default=10,
help='Context size for optimization. Default is 10.')
parser.add_argument('--iter', default=1, type=int,
help='Number of epochs in SGD')
parser.add_argument('--workers', type=int, default=8,
help='Number of parallel workers. Default is 8.')
parser.add_argument('--p', type=float, default=1,
help='Return hyperparameter. Default is 1.')
parser.add_argument('--q', type=float, default=1,
help='Inout hyperparameter. Default is 1.')
parser.add_argument('--weighted', dest='weighted', action='store_true',
help='Boolean specifying (un)weighted. Default is unweighted.')
parser.add_argument('--unweighted', dest='unweighted', action='store_false')
parser.set_defaults(weighted=False)
parser.add_argument('--directed', dest='directed', action='store_true',
help='Graph is (un)directed. Default is undirected.')
parser.add_argument('--undirected', dest='undirected', action='store_false')
parser.set_defaults(directed=False)
return parser.parse_args()
def read_graph():
'''
Reads the input network in networkx.
'''
if args.weighted:
G = nx.read_edgelist(args.input, nodetype=int, data=(('weight',float),), create_using=nx.DiGraph())
else:
G = nx.read_edgelist(args.input, nodetype=int, create_using=nx.DiGraph())
for edge in G.edges():
G[edge[0]][edge[1]]['weight'] = 1
if not args.directed:
G = G.to_undirected()
return G
def learn_embeddings(walks):
'''
Learn embeddings by optimizing the Skipgram objective using SGD.
'''
walks = [map(str, walk) for walk in walks]
model = Word2Vec(walks, size=args.dimensions, window=args.window_size, min_count=0, sg=1, workers=args.workers, iter=args.iter)
model.save_word2vec_format(args.output)
return
def main(args):
'''
Pipeline for representational learning for all nodes in a graph.
'''
nx_G = read_graph()
G = node2vec.Graph(nx_G, args.directed, args.p, args.q)
G.preprocess_transition_probs()
walks = G.simulate_walks(args.num_walks, args.walk_length)
learn_embeddings(walks)
if __name__ == "__main__":
args = parse_args()
main(args)
import numpy as np
import networkx as nx
import random
class Graph():
def __init__(self, nx_G, is_directed, p, q):
self.G = nx_G
self.is_directed = is_directed
self.p = p
self.q = q
def node2vec_walk(self, walk_length, start_node):
'''
Simulate a random walk starting from start node.
'''
G = self.G
alias_nodes = self.alias_nodes
alias_edges = self.alias_edges
walk = [start_node]
while len(walk) < walk_length:
cur = walk[-1]
cur_nbrs = sorted(G.neighbors(cur))
if len(cur_nbrs) > 0:
if len(walk) == 1:
walk.append(cur_nbrs[alias_draw(alias_nodes[cur][0], alias_nodes[cur][1])])
else:
prev = walk[-2]
next = cur_nbrs[alias_draw(alias_edges[(prev, cur)][0],
alias_edges[(prev, cur)][1])]
walk.append(next)
else:
break
return walk
def simulate_walks(self, num_walks, walk_length):
'''
Repeatedly simulate random walks from each node.
'''
G = self.G
walks = []
nodes = list(G.nodes())
print 'Walk iteration:'
for walk_iter in range(num_walks):
print str(walk_iter+1), '/', str(num_walks)
random.shuffle(nodes)
for node in nodes:
walks.append(self.node2vec_walk(walk_length=walk_length, start_node=node))
return walks
def get_alias_edge(self, src, dst):
'''
Get the alias edge setup lists for a given edge.
'''
G = self.G
p = self.p
q = self.q
unnormalized_probs = []
for dst_nbr in sorted(G.neighbors(dst)):
if dst_nbr == src:
unnormalized_probs.append(G[dst][dst_nbr]['weight']/p)
elif G.has_edge(dst_nbr, src):
unnormalized_probs.append(G[dst][dst_nbr]['weight'])
else:
unnormalized_probs.append(G[dst][dst_nbr]['weight']/q)
norm_const = sum(unnormalized_probs)
normalized_probs = [float(u_prob)/norm_const for u_prob in unnormalized_probs]
return alias_setup(normalized_probs)
def preprocess_transition_probs(self):
'''
Preprocessing of transition probabilities for guiding the random walks.
'''
G = self.G
is_directed = self.is_directed
alias_nodes = {}
for node in G.nodes():
unnormalized_probs = [G[node][nbr]['weight'] for nbr in sorted(G.neighbors(node))]
norm_const = sum(unnormalized_probs)
normalized_probs = [float(u_prob)/norm_const for u_prob in unnormalized_probs]
alias_nodes[node] = alias_setup(normalized_probs)
alias_edges = {}
triads = {}
if is_directed:
for edge in G.edges():
alias_edges[edge] = self.get_alias_edge(edge[0], edge[1])
else:
for edge in G.edges():
alias_edges[edge] = self.get_alias_edge(edge[0], edge[1])
alias_edges[(edge[1], edge[0])] = self.get_alias_edge(edge[1], edge[0])
self.alias_nodes = alias_nodes
self.alias_edges = alias_edges
return
def alias_setup(probs):
'''
Compute utility lists for non-uniform sampling from discrete distributions.
Refer to https://hips.seas.harvard.edu/blog/2013/03/03/the-alias-method-efficient-sampling-with-many-discrete-outcomes/
for details
'''
K = len(probs)
q = np.zeros(K)
J = np.zeros(K, dtype=np.int)
smaller = []
larger = []
for kk, prob in enumerate(probs):
q[kk] = K*prob
if q[kk] < 1.0:
smaller.append(kk)
else:
larger.append(kk)
while len(smaller) > 0 and len(larger) > 0:
small = smaller.pop()
large = larger.pop()
J[small] = large
q[large] = q[large] + q[small] - 1.0
if q[large] < 1.0:
smaller.append(large)
else:
larger.append(large)
return J, q
def alias_draw(J, q):
'''
Draw sample from a non-uniform discrete distribution using alias sampling.
'''
K = len(J)
kk = int(np.floor(np.random.rand()*K))
if np.random.rand() < q[kk]:
return kk
else:
return J[kk]
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment