*Hi,aprecio cualquier ayuda o *pointers en la dirección correcta
Mi prueba actual *scenario es el siguiendo.
Quiero procesar un *MongoDB colección, *anonymising algunos campos en él y almacenarlo
en otra Colección.
La medida de la colección es alrededor de 900 GB con 2.5 millones documentos
Siguiendo es el código.
Objeto *Anonymizer extiende *SparkRunner
#unknown{^*val *sqlContext = nuevo *SQLContext(*sc)
*MongoDBLoader(*conf, *sc, "producción").Carga(*MongoHadoopImplementationReader(*conf, *sc, "entrada").*rdd,
(*dbObject: *BSONObject)
#unknown{^*dbObject.Puesto("añade_campo", "John *Macclane")
*val *embedded = *dbObject.Consigue("*embedded").*asInstanceOf[*BasicDBObject]
*embedded.Puesto("nombreempresarial", Nombre.Primer_nombre)
*dbObject.Puesto("*embedded", *webRfq)
*val *notesWrapper = Opción(*dbObject.Consigue("*embedded_lista").*asInstanceOf[*java.*util.*ArrayList[*BasicDBObject]])
*notesWrapper Caso {
de partido Algunos(notas) =>
notas.*foreach((Nota: *BasicDBObject)
#nom.Puesto("texto", Nombre.Nombre)
})
caso Ninguno =>
}
*dbObject
}
)
}...
Y clase
de caso *MongoHadoopImplementationReader(*conf: *com.*typesafe.*config.*Config, *sc: *SparkContext, colección: Cadena)
#unknown{^*val *mongoConfig = Configuración nueva()
*mongoConfig.Conjunto("*mongo.Entrada.*uri",
*s"*mongodb:#unknown{^*conf.*getString("*replicant.*mongo_Anfitrión")}:27017#unknown{^*conf.*getString("*replicant.*mongo_*database")}.#Nom}")
*mongoConfig.Conjunto("*mongo.Entrada.Medida_de ruptura", "50")
*mongoConfig.Conjunto("*mongo.Entrada.Límite", "70")
*def *rdd: RDD[(Objeto, *BSONObject)]
#unknown{^*val *rdd = *sc.*newAPIHadoopRDD(
*mongoConfig,
*classOf[*MongoInputFormat],
*classOf[Objeto],
*classOf[*BSONObject])
*println("particiones de RDD: " + *rdd.Particiones.Longitud)
*rdd
}
}
Y clase
de caso *MongoDBLoader(*conf: *com.*typesafe.*config.*Config, *sc:*SparkContext, colección: Cadena)
#unknown{^*val *mongoConfig = Configuración nueva()
*mongoConfig.Conjunto("*mongo.Producción.*uri",
*s"*mongodb:#unknown{^*conf.*getString("*replicant.*mongo_Anfitrión")}:27017#unknown{^*conf.*getString("*replicant.*mongo_Producción_*database")}.#Nom}")
*def carga(*rdd: => RDD[(Objeto, *BSONObject)], *transformer: (*BSONObject) => *BSONObject)
#unknown{^*val *mongoRDD = *rdd.Mapa[(Objeto, *BSONObject)]((*tuple: (Objeto, *BSONObject))
#default{^(*null, *transformer(*tuple._2))
})
*mongoRDD.*saveAsNewAPIHadoopFile(
"Archivo:///esto-es-completamente-*unused",
*classOf[Objeto],
*classOf[*BSONObject],
*classOf[*MongoOutputFormat[Objeto, *BSONObject]],
*mongoConfig)
}
}
Estas carreras de código despacio. Tomando 9.5 horas en un 3 grupo de máquina para procesar
todo. Y después de 6 horas en un 30 grupo de máquina paré cuando era sólo
sobre a medias procesado.
Las máquinas son *ec2 *m3.Casos grandes. El *MongoDB vidas en otro EC2
caso dentro del mismo *VPC y mismo *subnet.
Intenté mirar a las opciones de configuración pero él parece que en la mayoría de
casos el *defaults es la manera de ir (número de núcleos, memoria, *etc).
Parece tengo algún *bottleneck *somewhere, pero no seguro nada. Y estoy pensando *Mongo no es capaz de manejar el paralelismo?
Cómo es el *RDDs almacenado en memoria?. Cuándo lo corro, veo consigo alrededor de 32000
particiones y las tareas crearon. Entonces mira para ir más despacio el procesando
hacia él adelanta (Esto puede ser debido a *mongo documenta ser más grande en
la segunda mitad de nuestro DB.).
Veo también que la ruptura es almacenada en *HDFS en *Spark y entonces leído y *BulkInserted
en *Mongo. Aun así hay mucho *HDFS espacio (como 30 actuaciones
por máquina) pero justo una fracción minúscula es utilizada. no lo ser mejor de llenar
esto más y sólo intentar insertar a *mongo cuándo más el dato es
disponible?.
Yo también probado para aumentar la medida de Ruptura, pero él *complains de no bastantes
recursos en el trabajador. Aun así no pienso las Rupturas son bastante grandes a de
hecho llenar el 6GB de memoria de cada nodo, cuando cuándo les almacena en
*HDFS es mucho menos que aquello.
Es allí cualquier cosa obvio (o no :)) que no estoy haciendo correctamente?. Es
esto la manera correcta de transformar una colección de *Mongo a *Mongo?. Es
allí otra manera?
--
Recibiste este mensaje porque eres *subscribed al *Google Grupos "*mongodb-grupo"
de usuario.
Para otro *MongoDB opciones de apoyo técnico, ve: *http://www.mongodb.org/sobre/apoyo/.
---
Recibiste este mensaje porque eres *subscribed al *Google Grupos "*mongodb-grupo" de usuario.
A *unsubscribe de este grupo y la parón que recibe *emails de él, enviar un *email a *mongodb-usuario+unsubscribe@xxxxxxxxxxxxxxxx.
A correo a este grupo, envía *email a *mongodb-user@xxxxxxxxxxxxxxxx.
Visita este grupo en *http://grupos.*google.*com/Grupo/*mongodb-usuario.
Para ver esta discusión en la visita de web *https://grupos.*google.*com/*d/*msgid/*mongodb-Usuario/1*b0061*d5-2*b5*f-4*df1-8874-1#uno54*fe95#uno126%40*googlegroups.*com.
Para más opciones, visita *https://grupos.*google.*com/*d/*optout.
| Hi,I appreciate any help or pointers in the right direction
My current test scenario is the following.
I want to process a MongoDB collection, anonymising some fields on it and
store it in another Collection.
The size of the collection is around 900 GB with 2.5 million documents
Following is the code.
object Anonymizer extends SparkRunner {
val sqlContext = new SQLContext(sc)
MongoDBLoader(conf, sc, "output").load(MongoHadoopImplementationReader(conf, sc, "input").rdd,
(dbObject: BSONObject) => {
dbObject.put("add_field", "John Macclane")
val embedded = dbObject.get("embedded").asInstanceOf[BasicDBObject]
embedded.put("business_name", Name.first_name)
dbObject.put("embedded", webRfq)
val notesWrapper = Option(dbObject.get("embedded_list").asInstanceOf[java.util.ArrayList[BasicDBObject]])
notesWrapper match {
case Some(notes) =>
notes.foreach((note: BasicDBObject) => {
note.put("text", Name.name)
})
case None =>
}
dbObject
}
)
}...
And
case class MongoHadoopImplementationReader(conf: com.typesafe.config.Config, sc: SparkContext, collection: String) {
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri",
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_database")}.${collection}")
mongoConfig.set("mongo.input.split_size", "50")
mongoConfig.set("mongo.input.limit", "70")
def rdd: RDD[(Object, BSONObject)] = {
val rdd = sc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject])
println("RDD partitions: " + rdd.partitions.length)
rdd
}
}
And
case class MongoDBLoader(conf: com.typesafe.config.Config, sc:SparkContext, collection: String) {
val mongoConfig = new Configuration()
mongoConfig.set("mongo.output.uri",
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_output_database")}.${collection}")
def load(rdd: => RDD[(Object, BSONObject)], transformer: (BSONObject) => BSONObject) = {
val mongoRDD = rdd.map[(Object, BSONObject)]((tuple: (Object, BSONObject)) => {
(null, transformer(tuple._2))
})
mongoRDD.saveAsNewAPIHadoopFile(
"file:///this-is-completely-unused",
classOf[Object],
classOf[BSONObject],
classOf[MongoOutputFormat[Object, BSONObject]],
mongoConfig)
}
}
This code runs slow. Taking 9.5 hours in a 3 machine cluster to process
all. And after 6 hours in a 30 machine cluster I stopped as it was only
about half processed.
The machines are ec2 m3.large instances. The MongoDB lives on another EC2
instance inside the same VPC and same subnet.
I tried to look into the configuration options but it seems that in most
cases the defaults are the way to go (number of cores, memory, etc).
It looks like I have some bottleneck somewhere, but not sure at all. And I
am thinking Mongo is not able to handle the parallelism?
How are the RDDs stored in memory?. When I run it, I see I get around 32000
partitions and tasks created. Then it looks to slow down the processing
towards it advance (This can be due to mongo documents being bigger at the
second half of our DB.).
I see as well that the split is stored in HDFS in Spark and then read and
BulkInserted in Mongo. However there is a lot of HDFS space (like 30 gigs
per machine) but just a tiny fraction is used. Wouldn't it be better to
fill this more and only try to insert into mongo when more data is
available?.
I also tried to increase the Split size, but it complains of not enough
resources on the worker. However I don't think the Splits are big enough to
actually fill the 6GB of memory of each node, as when it stores them on
HDFS is a lot less than that.
Is there anything obvious (or not :)) that I am not doing correctly?. Is
this the correct way to transform a collection from Mongo to Mongo?. Is
there another way?
--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
For other MongoDB technical support options, see: http://www.mongodb.org/about/support/.
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user+unsubscribe@xxxxxxxxxxxxxxxx.
To post to this group, send email to mongodb-user@xxxxxxxxxxxxxxxx.
Visit this group at http://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/1b0061d5-2b5f-4df1-8874-1a54fe95a126%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
|