when going through your examples for DAAL with Spark, I noticed that you only provide samples for local RDD structures, namely
Especially the last part is giving me a headache currently: When I do provide a file stored in the format (row_index, column_index, value),
I can still cast this (via CoordinateMatrix) to a RowMatrix, although it is internally stored differently;<br/>
Where your examples have a dense vector per row, the casting via CoordinateMatrix still maintains the sparse representation format for each row (i.e. two lists of the same length containing index and value, respectively).
This results in the error message for the SampleSVD example:
WARN TaskSetManager:66 - Lost task 0.0 in stage 4.0 (TID 8, 10.18.90.68, executor 2): java.lang.Exception: Number of rows in numeric table is incorrect Details: Argument name: data at com.intel.daal.algorithms.AnalysisOnline.cCompute(Native Method) at com.intel.daal.algorithms.AnalysisOnline.compute(Unknown Source) at com.intel.daal.algorithms.svd.Online.compute(Unknown Source) at com.intel.daal.algorithms.svd.DistributedStep1Local.compute(Unknown Source) at SVD$$anonfun$3.apply(SVD.scala:126) at SVD$$anonfun$3.apply(SVD.scala:111) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
DAAL implementation of SVD doesn't support input matrices in sparse format at this moment.
To use SVD you need to implement proper conversion from sparse format (CoordinateMatrix) to dense, because simple spark cast from CoordinateMatrix to RowMatrix just turns your data into a long table of size (number of entries) x 3, while you need the table of size (number of rows) x (number of columns).
Is the described approach is suitable for you?
I have successfully implemented an approach that does exactly that; it seems that it is possible to internally cast it such that it does not require the local aggregation of a Vector RDD (I can share a code sample tomorrow).
Yet, despite the fact that the internal representation is now exactly the same as from the given code example, I am still running into an issue which tells me that the respective dimensions do not match (for any document not of the size 4000x18). Reducing my own sample (with different values, and from an originally COO-stored format) to a 4000x18-dimensional matrix works; it thus seems that somewhere, the expected dimensions seem to be fixed in the code.
Thanks so much for the continuous updates!
The good news first: The results on the provided 4000x18 SVD.txt now finally (roughly) match the results I obtain from sklearn's implementation of PCA on this, and I verified that the results match what I expected them to be.
Sadly, the algorithm still fails for any other matrix dimension. I am not sure whether I have to specify these matrix dimensions somewhere, but from my understanding it should be automatically detected, right? The error is still the same;
java.lang.Exception: Number of rows in numeric table is incorrect
Could you provide additional details about the matrix dimensions you use?
The internal structure of input RDD also matters, like the number of partitions and its sizes.
In this case I was testing a 100x100 matrix, just to see whether it would work.
I wasn't doing any special pre-processing in terms of partitioning, but just using the sc.textFile() on a different file.
If it really was about the partitioning, I'm wondering why the provided source also does not do anything like that?
Do you happen to have any documentation on what exactly I do have to respect when calling the function?
As described in the SVD distributed processing section of the documentation (https://software.intel.com/en-us/daal-programming-guide-distributed-processing-5), each separate block of data should have number of rows >= number of columns.
In your case it means that, each partition of the input RDD should contain sufficient number of rows (>= number of columns).