mongodb-user
[Arriba] [Todas las Listas]

[mongodb-Usuario] mongo-hadoop con Spark es despacio para mí, y añadie

To: mongodb-user <mongodb-user@xxxxxxxxxxxxxxxx>
Subject: [mongodb-Usuario] mongo-hadoop con Spark es despacio para mí, y añadiendo los nodos no parece para hacer cualquier noticeable diferencia
From: Carlo Scarioni <carlo.scarioni@xxxxxxxxx>
Date: Wed, 16 Sep 2015 03:27:22 -0700 (PDT)
Delivery-date: Wed, 16 Sep 2015 19:45:51 -0400
Dkim-signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=googlegroups.com; s=20120806; h=date:from:to:message-id:subject:mime-version:content-type :x-original-sender:reply-to:precedence:mailing-list:list-id :list-post:list-help:list-archive:sender:list-subscribe :list-unsubscribe; bh=8c/7l1JHtZdsYoI3ZNkvSNqZQwe6Y8QOcDyX/m+goeo=; b=bRfp7/ykt/om+vTq74ANvqbRGpyrfe+YwoJnKYy8VJL8qN278qAvADKMkAkmLcvFTa 1halPWD4glJ+6Y/sZtJbsaajimdQT/S+3+gp+x/HD81AFqlupSXLs3ZvPezrGKkG3GqT 4MdIKIhHX8ZKDI6GuAS81U2KoexBtCMkQu9b+QIwWQ9Gzkx6JSojg0oXOt6foo2iqIAl eJOC6mzgGpUuO12oReuR21mg2WueiaHSf7wiws5djyEEvG6VlVPvq7NVviLQUSxa4Swt QyC6cRtXiEv5bj4t779+NQb4Q7LMYslo3iPGJIVWUP7D4oWduWT4QHO+JxcsXj9JgsdU SN+A==
Envelope-to: traductor@xxxxxxxxxxx
List-archive: <http://groups.google.com/group/mongodb-use>
List-help: <http://groups.google.com/support/>, <mailto:mongodb-user+help@googlegroups.com>
List-id: <mongodb-user.googlegroups.com>
List-post: <http://groups.google.com/group/mongodb-user/post>, <mailto:mongodb-user@googlegroups.com>
List-subscribe: <http://groups.google.com/group/mongodb-user/subscribe>, <mailto:mongodb-user+subscribe@googlegroups.com>
List-unsubscribe: <mailto:googlegroups-manage+1044811755470+unsubscribe@googlegroups.com>, <http://groups.google.com/group/mongodb-user/subscribe>
Mailing-list: list mongodb-user@xxxxxxxxxxxxxxxx; contact mongodb-user+owners@xxxxxxxxxxxxxxxx
Reply-to: mongodb-user@xxxxxxxxxxxxxxxx
Sender: mongodb-user@xxxxxxxxxxxxxxxx
*Hi,aprecio cualquier ayuda o *pointers en la dirección correcta

Mi prueba actual *scenario es el siguiendo.

Quiero procesar un *MongoDB colección, *anonymising algunos campos en él y almacenarlo 
en otra Colección.

La medida de la colección es alrededor de 900 GB con 2.5 millones documentos

Siguiendo es el código.



Objeto *Anonymizer extiende *SparkRunner

  #unknown{^*val *sqlContext = nuevo *SQLContext(*sc)

  *MongoDBLoader(*conf, *sc, "producción").Carga(*MongoHadoopImplementationReader(*conf, *sc, "entrada").*rdd,
    (*dbObject: *BSONObject)
      #unknown{^*dbObject.Puesto("añade_campo", "John *Macclane")
      *val *embedded = *dbObject.Consigue("*embedded").*asInstanceOf[*BasicDBObject]
      *embedded.Puesto("nombreempresarial", Nombre.Primer_nombre)
      *dbObject.Puesto("*embedded", *webRfq)
      *val *notesWrapper = Opción(*dbObject.Consigue("*embedded_lista").*asInstanceOf[*java.*util.*ArrayList[*BasicDBObject]])
      *notesWrapper Caso {
        de partido Algunos(notas) =>
          notas.*foreach((Nota: *BasicDBObject)
            #nom.Puesto("texto", Nombre.Nombre)
          })
        caso Ninguno =>
      }
      *dbObject
    }
  )

}...

Y clase




de caso *MongoHadoopImplementationReader(*conf: *com.*typesafe.*config.*Config, *sc: *SparkContext, colección: Cadena)
  #unknown{^*val *mongoConfig = Configuración nueva()

  *mongoConfig.Conjunto("*mongo.Entrada.*uri",
    *s"*mongodb:#unknown{^*conf.*getString("*replicant.*mongo_Anfitrión")}:27017#unknown{^*conf.*getString("*replicant.*mongo_*database")}.#Nom}")
  *mongoConfig.Conjunto("*mongo.Entrada.Medida_de ruptura", "50")
  *mongoConfig.Conjunto("*mongo.Entrada.Límite", "70")


  *def *rdd: RDD[(Objeto, *BSONObject)]
    #unknown{^*val *rdd = *sc.*newAPIHadoopRDD(
      *mongoConfig,
      *classOf[*MongoInputFormat],
      *classOf[Objeto],
      *classOf[*BSONObject])
    *println("particiones de RDD: " + *rdd.Particiones.Longitud)
    *rdd
  }

}




Y clase 




de caso *MongoDBLoader(*conf: *com.*typesafe.*config.*Config, *sc:*SparkContext, colección: Cadena)

  #unknown{^*val *mongoConfig = Configuración nueva()

  *mongoConfig.Conjunto("*mongo.Producción.*uri",
    *s"*mongodb:#unknown{^*conf.*getString("*replicant.*mongo_Anfitrión")}:27017#unknown{^*conf.*getString("*replicant.*mongo_Producción_*database")}.#Nom}")

  *def carga(*rdd: => RDD[(Objeto, *BSONObject)], *transformer: (*BSONObject) => *BSONObject)

    #unknown{^*val *mongoRDD = *rdd.Mapa[(Objeto, *BSONObject)]((*tuple: (Objeto, *BSONObject))
      #default{^(*null, *transformer(*tuple._2))
    })

    *mongoRDD.*saveAsNewAPIHadoopFile(
      "Archivo:///esto-es-completamente-*unused",
      *classOf[Objeto],
      *classOf[*BSONObject],
      *classOf[*MongoOutputFormat[Objeto, *BSONObject]],
      *mongoConfig)
  }
}




Estas carreras de código despacio. Tomando 9.5 horas en un 3 grupo de máquina para procesar 
todo. Y después de 6 horas en un 30 grupo de máquina paré cuando era sólo 
sobre a medias procesado.

Las máquinas son *ec2 *m3.Casos grandes. El *MongoDB vidas en otro EC2 
caso dentro del mismo *VPC y mismo *subnet.

Intenté mirar a las opciones de configuración pero él parece que en la mayoría de 
casos el *defaults es la manera de ir (número de núcleos, memoria, *etc). 

Parece tengo algún *bottleneck *somewhere, pero no seguro nada. Y estoy pensando *Mongo no es capaz de manejar el paralelismo? 

Cómo es el *RDDs almacenado en memoria?. Cuándo lo corro, veo consigo alrededor de 32000 
particiones y las tareas crearon. Entonces mira para ir más despacio el procesando 
hacia él adelanta (Esto puede ser debido a *mongo documenta ser más grande en 
la segunda mitad de nuestro DB.).

Veo también que la ruptura es almacenada en *HDFS en *Spark y entonces leído y *BulkInserted 
en *Mongo. Aun así hay mucho *HDFS espacio (como 30 actuaciones 
por máquina) pero justo una fracción minúscula es utilizada.  no lo ser mejor de llenar 
esto más y sólo intentar insertar a *mongo cuándo más el dato es 
disponible?. 

Yo también probado para aumentar la medida de Ruptura, pero él *complains de no bastantes 
recursos en el trabajador. Aun así no pienso las Rupturas son bastante grandes a de 
hecho llenar el 6GB de memoria de cada nodo, cuando cuándo les almacena en 
*HDFS es mucho menos que aquello.

Es allí cualquier cosa obvio (o no :)) que no estoy haciendo correctamente?. Es 
esto la manera correcta de transformar una colección de *Mongo a *Mongo?. Es 
allí otra manera?



-- 
Recibiste este mensaje porque eres *subscribed al *Google Grupos "*mongodb-grupo"
de usuario.

Para otro *MongoDB opciones de apoyo técnico, ve: *http://www.mongodb.org/sobre/apoyo/.
--- 
Recibiste este mensaje porque eres *subscribed al *Google Grupos "*mongodb-grupo" de usuario.
A *unsubscribe de este grupo y la parón que recibe *emails de él, enviar un *email a *mongodb-usuario+unsubscribe@xxxxxxxxxxxxxxxx.
A correo a este grupo, envía *email a *mongodb-user@xxxxxxxxxxxxxxxx.
Visita este grupo en *http://grupos.*google.*com/Grupo/*mongodb-usuario.
Para ver esta discusión en la visita de web *https://grupos.*google.*com/*d/*msgid/*mongodb-Usuario/1*b0061*d5-2*b5*f-4*df1-8874-1#uno54*fe95#uno126%40*googlegroups.*com.
Para más opciones, visita *https://grupos.*google.*com/*d/*optout.
Hi,I appreciate any help or pointers in the right direction

My current test scenario is the following.

I want to process a MongoDB collection, anonymising some fields on it and 
store it in another Collection.

The size of the collection is around 900 GB with 2.5 million documents

Following is the code.



object Anonymizer extends SparkRunner {

  val sqlContext = new SQLContext(sc)

  MongoDBLoader(conf, sc, "output").load(MongoHadoopImplementationReader(conf, sc, "input").rdd,
    (dbObject: BSONObject) => {
      dbObject.put("add_field", "John Macclane")
      val embedded = dbObject.get("embedded").asInstanceOf[BasicDBObject]
      embedded.put("business_name", Name.first_name)
      dbObject.put("embedded", webRfq)
      val notesWrapper = Option(dbObject.get("embedded_list").asInstanceOf[java.util.ArrayList[BasicDBObject]])
      notesWrapper match {
        case Some(notes) =>
          notes.foreach((note: BasicDBObject) => {
            note.put("text", Name.name)
          })
        case None =>
      }
      dbObject
    }
  )

}...

And




case class MongoHadoopImplementationReader(conf: com.typesafe.config.Config, sc: SparkContext, collection: String) {
  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.input.uri",
    s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_database")}.${collection}")
  mongoConfig.set("mongo.input.split_size", "50")
  mongoConfig.set("mongo.input.limit", "70")


  def rdd: RDD[(Object, BSONObject)] = {
    val rdd = sc.newAPIHadoopRDD(
      mongoConfig,
      classOf[MongoInputFormat],
      classOf[Object],
      classOf[BSONObject])
    println("RDD partitions: " + rdd.partitions.length)
    rdd
  }

}




And 




case class MongoDBLoader(conf: com.typesafe.config.Config, sc:SparkContext, collection: String) {

  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.output.uri",
    s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_output_database")}.${collection}")

  def load(rdd: => RDD[(Object, BSONObject)], transformer: (BSONObject) => BSONObject) = {

    val mongoRDD = rdd.map[(Object, BSONObject)]((tuple: (Object, BSONObject)) => {
      (null, transformer(tuple._2))
    })

    mongoRDD.saveAsNewAPIHadoopFile(
      "file:///this-is-completely-unused",
      classOf[Object],
      classOf[BSONObject],
      classOf[MongoOutputFormat[Object, BSONObject]],
      mongoConfig)
  }
}




This code runs slow. Taking 9.5 hours in a 3 machine cluster to process 
all. And after 6 hours in a 30 machine cluster I stopped as it was only 
about half processed.

The machines are ec2 m3.large instances. The MongoDB lives on another EC2 
instance inside the same VPC and same subnet.

I tried to look into the configuration options but it seems that in most 
cases the defaults are the way to go (number of cores, memory, etc). 

It looks like I have some bottleneck somewhere, but not sure at all. And I 
am thinking Mongo is not able to handle the parallelism? 

How are the RDDs stored in memory?. When I run it, I see I get around 32000 
partitions and tasks created. Then it looks to slow down the processing 
towards it advance (This can be due to mongo documents being bigger at the 
second half of our DB.).

I see as well that the split is stored in HDFS in Spark and then read and 
BulkInserted in Mongo. However there is a lot of HDFS space (like 30 gigs 
per machine) but just a tiny fraction is used. Wouldn't it be better to 
fill this more and only try to insert into mongo when more data is 
available?. 

I also tried to increase the Split size, but it complains of not enough 
resources on the worker. However I don't think the Splits are big enough to 
actually fill the 6GB of memory of each node, as when it stores them on 
HDFS is a lot less than that.

Is there anything obvious (or not :)) that I am not doing correctly?. Is 
this the correct way to transform a collection from Mongo to Mongo?. Is 
there another way?



-- 
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.

For other MongoDB technical support options, see: http://www.mongodb.org/about/support/.
--- 
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user+unsubscribe@xxxxxxxxxxxxxxxx.
To post to this group, send email to mongodb-user@xxxxxxxxxxxxxxxx.
Visit this group at http://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/1b0061d5-2b5f-4df1-8874-1a54fe95a126%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
<Anterior por Tema] Tema Actual [Siguiente por Tema>
  • [mongodb-Usuario] mongo-hadoop con Spark es despacio para mí, y añadiendo los nodos no parece para hacer cualquier noticeable diferencia, Carlo Scarioni <=