- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi,
when going through your examples for DAAL with Spark, I noticed that you only provide samples for local RDD structures, namely
- reading from a textfile stored locally, and not in an HDFS system etc.
- processing these into Vectors.dense, which is again a local file structure.
- reading in a full matrix (opposed to sparse matrix representation like COO format)
Especially the last part is giving me a headache currently: When I do provide a file stored in the format (row_index, column_index, value),
I can still cast this (via CoordinateMatrix) to a RowMatrix, although it is internally stored differently;<br/>
Where your examples have a dense vector per row, the casting via CoordinateMatrix still maintains the sparse representation format for each row (i.e. two lists of the same length containing index and value, respectively).
This results in the error message for the SampleSVD example:
WARN TaskSetManager:66 - Lost task 0.0 in stage 4.0 (TID 8, 10.18.90.68, executor 2): java.lang.Exception: Number of rows in numeric table is incorrect Details: Argument name: data at com.intel.daal.algorithms.AnalysisOnline.cCompute(Native Method) at com.intel.daal.algorithms.AnalysisOnline.compute(Unknown Source) at com.intel.daal.algorithms.svd.Online.compute(Unknown Source) at com.intel.daal.algorithms.svd.DistributedStep1Local.compute(Unknown Source) at SVD$$anonfun$3.apply(SVD.scala:126) at SVD$$anonfun$3.apply(SVD.scala:111) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Link Copied
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Dennis,
DAAL implementation of SVD doesn't support input matrices in sparse format at this moment.
To use SVD you need to implement proper conversion from sparse format (CoordinateMatrix) to dense, because simple spark cast from CoordinateMatrix to RowMatrix just turns your data into a long table of size (number of entries) x 3, while you need the table of size (number of rows) x (number of columns).
Is the described approach is suitable for you?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
I have successfully implemented an approach that does exactly that; it seems that it is possible to internally cast it such that it does not require the local aggregation of a Vector RDD (I can share a code sample tomorrow).
Yet, despite the fact that the internal representation is now exactly the same as from the given code example, I am still running into an issue which tells me that the respective dimensions do not match (for any document not of the size 4000x18). Reducing my own sample (with different values, and from an originally COO-stored format) to a 4000x18-dimensional matrix works; it thus seems that somewhere, the expected dimensions seem to be fixed in the code.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Dennis,
We will analyze the issue with the sizes of the result RDD.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Thanks so much for the continuous updates!
The good news first: The results on the provided 4000x18 SVD.txt now finally (roughly) match the results I obtain from sklearn's implementation of PCA on this, and I verified that the results match what I expected them to be.
Sadly, the algorithm still fails for any other matrix dimension. I am not sure whether I have to specify these matrix dimensions somewhere, but from my understanding it should be automatically detected, right? The error is still the same;
java.lang.Exception: Number of rows in numeric table is incorrect
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
Hi Dennis,
Could you provide additional details about the matrix dimensions you use?
The internal structure of input RDD also matters, like the number of partitions and its sizes.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
In this case I was testing a 100x100 matrix, just to see whether it would work.
I wasn't doing any special pre-processing in terms of partitioning, but just using the sc.textFile() on a different file.
If it really was about the partitioning, I'm wondering why the provided source also does not do anything like that?
Do you happen to have any documentation on what exactly I do have to respect when calling the function?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
As described in the SVD distributed processing section of the documentation (https://software.intel.com/en-us/daal-programming-guide-distributed-processing-5), each separate block of data should have number of rows >= number of columns.
In your case it means that, each partition of the input RDD should contain sufficient number of rows (>= number of columns).
- Subscribe to RSS Feed
- Mark Topic as New
- Mark Topic as Read
- Float this Topic for Current User
- Bookmark
- Subscribe
- Printer Friendly Page