## Licensed to the Apache Software Foundation (ASF) under one or more# contributor license agreements. See the NOTICE file distributed with# this work for additional information regarding copyright ownership.# The ASF licenses this file to You under the Apache License, Version 2.0# (the "License"); you may not use this file except in compliance with# the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#"""MLlib utilities for linear algebra. For dense vectors, MLlibuses the NumPy `array` type, so you can simply pass NumPy arraysaround. For sparse vectors, users can construct a :class:`SparseVector`object from MLlib or pass SciPy `scipy.sparse` column vectors ifSciPy is available in their environment."""importsysimportarrayimportstructimportnumpyasnpfrompysparkimportsincefrompyspark.mlimportlinalgasnewlinalgfrompyspark.sql.typesimport(UserDefinedType,StructField,StructType,ArrayType,DoubleType,IntegerType,ByteType,BooleanType,)fromtypingimport(Any,Callable,cast,Dict,Generic,Iterable,List,Optional,overload,Sequence,Tuple,Type,TypeVar,TYPE_CHECKING,Union,)ifTYPE_CHECKING:frompyspark.mllib._typingimportVectorLike,NormTypefromscipy.sparseimportspmatrixfromnumpy.typingimportArrayLikeQT=TypeVar("QT")RT=TypeVar("RT")__all__=["Vector","DenseVector","SparseVector","Vectors","Matrix","DenseMatrix","SparseMatrix","Matrices","QRDecomposition",]# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.try:importscipy.sparse_have_scipy=TrueexceptBaseException:# No SciPy in environment, but that's okay_have_scipy=Falsedef_convert_to_vector(d:Union["VectorLike","spmatrix",range])->"Vector":ifisinstance(d,Vector):returndeliftype(d)in(array.array,np.array,np.ndarray,list,tuple,range):returnDenseVector(d)elif_have_scipyandscipy.sparse.issparse(d):assertcast("spmatrix",d).shape[1]==1,"Expected column vector"# Make sure the converted csc_matrix has sorted indices.csc=cast("spmatrix",d).tocsc()ifnotcsc.has_sorted_indices:csc.sort_indices()returnSparseVector(cast("spmatrix",d).shape[0],csc.indices,csc.data)else:raiseTypeError("Cannot convert type %s into Vector"%type(d))def_vector_size(v:Union["VectorLike","spmatrix",range])->int:""" Returns the size of the vector. Examples -------- >>> _vector_size([1., 2., 3.]) 3 >>> _vector_size((1., 2., 3.)) 3 >>> _vector_size(array.array('d', [1., 2., 3.])) 3 >>> _vector_size(np.zeros(3)) 3 >>> _vector_size(np.zeros((3, 1))) 3 >>> _vector_size(np.zeros((1, 3))) Traceback (most recent call last): ... ValueError: Cannot treat an ndarray of shape (1, 3) as a vector """ifisinstance(v,Vector):returnlen(v)eliftype(v)in(array.array,list,tuple,range):returnlen(v)eliftype(v)==np.ndarray:ifv.ndim==1or(v.ndim==2andv.shape[1]==1):returnlen(v)else:raiseValueError("Cannot treat an ndarray of shape %s as a vector"%str(v.shape))elif_have_scipyandscipy.sparse.issparse(v):assertcast("spmatrix",v).shape[1]==1,"Expected column vector"returncast("spmatrix",v).shape[0]else:raiseTypeError("Cannot treat type %s as a vector"%type(v))def_format_float(f:float,digits:int=4)->str:s=str(round(f,digits))if"."ins:s=s[:s.index(".")+1+digits]returnsdef_format_float_list(xs:Iterable[float])->List[str]:return[_format_float(x)forxinxs]def_double_to_long_bits(value:float)->int:ifnp.isnan(value):value=float("nan")# pack double into 64 bits, then unpack as long intreturnstruct.unpack("Q",struct.pack("d",value))[0]classVectorUDT(UserDefinedType):""" SQL user-defined type (UDT) for Vector. """@classmethoddefsqlType(cls)->StructType:returnStructType([StructField("type",ByteType(),False),StructField("size",IntegerType(),True),StructField("indices",ArrayType(IntegerType(),False),True),StructField("values",ArrayType(DoubleType(),False),True),])@classmethoddefmodule(cls)->str:return"pyspark.mllib.linalg"@classmethoddefscalaUDT(cls)->str:return"org.apache.spark.mllib.linalg.VectorUDT"defserialize(self,obj:"Vector")->Tuple[int,Optional[int],Optional[List[int]],List[float]]:ifisinstance(obj,SparseVector):indices=[int(i)foriinobj.indices]values=[float(v)forvinobj.values]return(0,obj.size,indices,values)elifisinstance(obj,DenseVector):values=[float(v)forvinobj]# type: ignore[attr-defined]return(1,None,None,values)else:raiseTypeError("cannot serialize %r of type %r"%(obj,type(obj)))defdeserialize(self,datum:Tuple[int,Optional[int],Optional[List[int]],List[float]])->"Vector":assert(len(datum)==4),"VectorUDT.deserialize given row with length %d but requires 4"%len(datum)tpe=datum[0]iftpe==0:returnSparseVector(cast(int,datum[1]),cast(List[int],datum[2]),datum[3])eliftpe==1:returnDenseVector(datum[3])else:raiseValueError("do not recognize type %r"%tpe)defsimpleString(self)->str:return"vector"classMatrixUDT(UserDefinedType):""" SQL user-defined type (UDT) for Matrix. """@classmethoddefsqlType(cls)->StructType:returnStructType([StructField("type",ByteType(),False),StructField("numRows",IntegerType(),False),StructField("numCols",IntegerType(),False),StructField("colPtrs",ArrayType(IntegerType(),False),True),StructField("rowIndices",ArrayType(IntegerType(),False),True),StructField("values",ArrayType(DoubleType(),False),True),StructField("isTransposed",BooleanType(),False),])@classmethoddefmodule(cls)->str:return"pyspark.mllib.linalg"@classmethoddefscalaUDT(cls)->str:return"org.apache.spark.mllib.linalg.MatrixUDT"defserialize(self,obj:"Matrix")->Tuple[int,int,int,Optional[List[int]],Optional[List[int]],List[float],bool]:ifisinstance(obj,SparseMatrix):colPtrs=[int(i)foriinobj.colPtrs]rowIndices=[int(i)foriinobj.rowIndices]values=[float(v)forvinobj.values]return(0,obj.numRows,obj.numCols,colPtrs,rowIndices,values,bool(obj.isTransposed),)elifisinstance(obj,DenseMatrix):values=[float(v)forvinobj.values]return(1,obj.numRows,obj.numCols,None,None,values,bool(obj.isTransposed))else:raiseTypeError("cannot serialize type %r"%(type(obj)))defdeserialize(self,datum:Tuple[int,int,int,Optional[List[int]],Optional[List[int]],List[float],bool],)->"Matrix":assert(len(datum)==7),"MatrixUDT.deserialize given row with length %d but requires 7"%len(datum)tpe=datum[0]iftpe==0:returnSparseMatrix(datum[1],datum[2],cast(List[int],datum[3]),cast(List[int],datum[4]),datum[5],datum[6],)eliftpe==1:returnDenseMatrix(datum[1],datum[2],datum[5],datum[6])else:raiseValueError("do not recognize type %r"%tpe)defsimpleString(self)->str:return"matrix"
[docs]classVector:__UDT__=VectorUDT()""" Abstract class for DenseVector and SparseVector """
[docs]deftoArray(self)->np.ndarray:""" Convert the vector into an numpy.ndarray Returns ------- :py:class:`numpy.ndarray` """raiseNotImplementedError
[docs]defasML(self)->newlinalg.Vector:""" Convert this vector to the new mllib-local representation. This does NOT copy the data; it copies references. Returns ------- :py:class:`pyspark.ml.linalg.Vector` """raiseNotImplementedError
def__len__(self)->int:raiseNotImplementedError
[docs]classDenseVector(Vector):""" A dense vector represented by a value array. We use numpy array for storage and arithmetics will be delegated to the underlying numpy array. Examples -------- >>> v = Vectors.dense([1.0, 2.0]) >>> u = Vectors.dense([3.0, 4.0]) >>> v + u DenseVector([4.0, 6.0]) >>> 2 - v DenseVector([1.0, 0.0]) >>> v / 2 DenseVector([0.5, 1.0]) >>> v * u DenseVector([3.0, 8.0]) >>> u / v DenseVector([3.0, 2.0]) >>> u % 2 DenseVector([1.0, 0.0]) >>> -v DenseVector([-1.0, -2.0]) """def__init__(self,ar:Union[bytes,np.ndarray,Iterable[float]]):ar_:np.ndarrayifisinstance(ar,bytes):ar_=np.frombuffer(ar,dtype=np.float64)elifnotisinstance(ar,np.ndarray):ar_=np.array(ar,dtype=np.float64)else:ar_=ar.astype(np.float64)ifar.dtype!=np.float64elsearself.array=ar_
[docs]@staticmethoddefparse(s:str)->"DenseVector":""" Parse string representation back into the DenseVector. Examples -------- >>> DenseVector.parse(' [ 0.0,1.0,2.0, 3.0]') DenseVector([0.0, 1.0, 2.0, 3.0]) """start=s.find("[")ifstart==-1:raiseValueError("Array should start with '['.")end=s.find("]")ifend==-1:raiseValueError("Array should end with ']'.")s=s[start+1:end]try:values=[float(val)forvalins.split(",")ifval]exceptValueError:raiseValueError("Unable to parse values from %s"%s)returnDenseVector(values)
[docs]deftoArray(self)->np.ndarray:""" Returns an numpy.ndarray """returnself.array
[docs]defasML(self)->newlinalg.DenseVector:""" Convert this vector to the new mllib-local representation. This does NOT copy the data; it copies references. .. versionadded:: 2.0.0 Returns ------- :py:class:`pyspark.ml.linalg.DenseVector` """returnnewlinalg.DenseVector(self.array)
@propertydefvalues(self)->np.ndarray:""" Returns a list of values """returnself.array@overloaddef__getitem__(self,item:int)->np.float64:...@overloaddef__getitem__(self,item:slice)->np.ndarray:...def__getitem__(self,item:Union[int,slice])->Union[np.float64,np.ndarray]:returnself.array[item]def__len__(self)->int:returnlen(self.array)def__str__(self)->str:return"["+",".join([str(v)forvinself.array])+"]"def__repr__(self)->str:return"DenseVector([%s])"%(", ".join(_format_float(i)foriinself.array))def__eq__(self,other:Any)->bool:ifisinstance(other,DenseVector):returnnp.array_equal(self.array,other.array)elifisinstance(other,SparseVector):iflen(self)!=other.size:returnFalsereturnVectors._equals(list(range(len(self))),self.array,other.indices,other.values)returnFalsedef__ne__(self,other:Any)->bool:returnnotself==otherdef__hash__(self)->int:size=len(self)result=31+sizennz=0i=0whilei<sizeandnnz<128:ifself.array[i]!=0:result=31*result+ibits=_double_to_long_bits(self.array[i])result=31*result+(bits^(bits>>32))nnz+=1i+=1returnresultdef__getattr__(self,item:str)->Any:returngetattr(self.array,item)def__neg__(self)->"DenseVector":returnDenseVector(-self.array)def_delegate(op:str)->Callable[["DenseVector",Any],"DenseVector"]:# type: ignore[misc]deffunc(self:"DenseVector",other:Any)->"DenseVector":ifisinstance(other,DenseVector):other=other.arrayreturnDenseVector(getattr(self.array,op)(other))returnfunc__add__=_delegate("__add__")__sub__=_delegate("__sub__")__mul__=_delegate("__mul__")__div__=_delegate("__div__")__truediv__=_delegate("__truediv__")__mod__=_delegate("__mod__")__radd__=_delegate("__radd__")__rsub__=_delegate("__rsub__")__rmul__=_delegate("__rmul__")__rdiv__=_delegate("__rdiv__")__rtruediv__=_delegate("__rtruediv__")__rmod__=_delegate("__rmod__")
[docs]classSparseVector(Vector):""" A simple sparse vector class for passing data to MLlib. Users may alternatively pass SciPy's {scipy.sparse} data types. """@overloaddef__init__(self,size:int,__indices:bytes,__values:bytes):...@overloaddef__init__(self,size:int,*args:Tuple[int,float]):...@overloaddef__init__(self,size:int,__indices:Iterable[int],__values:Iterable[float]):...@overloaddef__init__(self,size:int,__pairs:Iterable[Tuple[int,float]]):...@overloaddef__init__(self,size:int,__map:Dict[int,float]):...def__init__(self,size:int,*args:Union[bytes,Tuple[int,float],Iterable[float],Iterable[Tuple[int,float]],Dict[int,float]],):""" Create a sparse vector, using either a dictionary, a list of (index, value) pairs, or two separate arrays of indices and values (sorted by index). Parameters ---------- size : int Size of the vector. args Active entries, as a dictionary {index: value, ...}, a list of tuples [(index, value), ...], or a list of strictly increasing indices and a list of corresponding values [index, ...], [value, ...]. Inactive entries are treated as zeros. Examples -------- >>> SparseVector(4, {1: 1.0, 3: 5.5}) SparseVector(4, {1: 1.0, 3: 5.5}) >>> SparseVector(4, [(1, 1.0), (3, 5.5)]) SparseVector(4, {1: 1.0, 3: 5.5}) >>> SparseVector(4, [1, 3], [1.0, 5.5]) SparseVector(4, {1: 1.0, 3: 5.5}) """self.size=int(size)""" Size of the vector. """assert1<=len(args)<=2,"must pass either 2 or 3 arguments"iflen(args)==1:pairs=args[0]iftype(pairs)==dict:pairs=pairs.items()pairs=cast(Iterable[Tuple[int,float]],sorted(pairs))self.indices=np.array([p[0]forpinpairs],dtype=np.int32)""" A list of indices corresponding to active entries. """self.values=np.array([p[1]forpinpairs],dtype=np.float64)""" A list of values corresponding to active entries. """else:ifisinstance(args[0],bytes):assertisinstance(args[1],bytes),"values should be string too"ifargs[0]:self.indices=np.frombuffer(args[0],np.int32)self.values=np.frombuffer(args[1],np.float64)else:# np.frombuffer() doesn't work well with empty string in older versionself.indices=np.array([],dtype=np.int32)self.values=np.array([],dtype=np.float64)else:self.indices=np.array(args[0],dtype=np.int32)self.values=np.array(args[1],dtype=np.float64)assertlen(self.indices)==len(self.values),"index and value arrays not same length"foriinrange(len(self.indices)-1):ifself.indices[i]>=self.indices[i+1]:raiseTypeError("Indices %s and %s are not strictly increasing"%(self.indices[i],self.indices[i+1]))
[docs]defnumNonzeros(self)->int:""" Number of nonzero elements. This scans all active values and count non zeros. """returnnp.count_nonzero(self.values)
[docs]defnorm(self,p:"NormType")->np.float64:""" Calculates the norm of a SparseVector. Examples -------- >>> a = SparseVector(4, [0, 1], [3., -4.]) >>> a.norm(1) 7.0 >>> a.norm(2) 5.0 """returnnp.linalg.norm(self.values,p)
[docs]@staticmethoddefparse(s:str)->"SparseVector":""" Parse string representation back into the SparseVector. Examples -------- >>> SparseVector.parse(' (4, [0,1 ],[ 4.0,5.0] )') SparseVector(4, {0: 4.0, 1: 5.0}) """start=s.find("(")ifstart==-1:raiseValueError("Tuple should start with '('")end=s.find(")")ifend==-1:raiseValueError("Tuple should end with ')'")s=s[start+1:end].strip()size=s[:s.find(",")]try:size=int(size)# type: ignore[assignment]exceptValueError:raiseValueError("Cannot parse size %s."%size)ind_start=s.find("[")ifind_start==-1:raiseValueError("Indices array should start with '['.")ind_end=s.find("]")ifind_end==-1:raiseValueError("Indices array should end with ']'")new_s=s[ind_start+1:ind_end]ind_list=new_s.split(",")try:indices=[int(ind)forindinind_listifind]exceptValueError:raiseValueError("Unable to parse indices from %s."%new_s)s=s[ind_end+1:].strip()val_start=s.find("[")ifval_start==-1:raiseValueError("Values array should start with '['.")val_end=s.find("]")ifval_end==-1:raiseValueError("Values array should end with ']'.")val_list=s[val_start+1:val_end].split(",")try:values=[float(val)forvalinval_listifval]exceptValueError:raiseValueError("Unable to parse values from %s."%s)returnSparseVector(cast(int,size),indices,values)
[docs]deftoArray(self)->np.ndarray:""" Returns a copy of this SparseVector as a 1-dimensional NumPy array. """arr=np.zeros((self.size,),dtype=np.float64)arr[self.indices]=self.valuesreturnarr
[docs]defasML(self)->newlinalg.SparseVector:""" Convert this vector to the new mllib-local representation. This does NOT copy the data; it copies references. .. versionadded:: 2.0.0 Returns ------- :py:class:`pyspark.ml.linalg.SparseVector` """returnnewlinalg.SparseVector(self.size,self.indices,self.values)
def__len__(self)->int:returnself.sizedef__str__(self)->str:inds="["+",".join([str(i)foriinself.indices])+"]"vals="["+",".join([str(v)forvinself.values])+"]"return"("+",".join((str(self.size),inds,vals))+")"def__repr__(self)->str:inds=self.indicesvals=self.valuesentries=", ".join(["{0}: {1}".format(inds[i],_format_float(vals[i]))foriinrange(len(inds))])return"SparseVector({0}, {{{1}}})".format(self.size,entries)def__eq__(self,other:Any)->bool:ifisinstance(other,SparseVector):return(other.size==self.sizeandnp.array_equal(other.indices,self.indices)andnp.array_equal(other.values,self.values))elifisinstance(other,DenseVector):ifself.size!=len(other):returnFalsereturnVectors._equals(self.indices,self.values,list(range(len(other))),other.array)returnFalsedef__getitem__(self,index:int)->np.float64:inds=self.indicesvals=self.valuesifnotisinstance(index,int):raiseTypeError("Indices must be of type integer, got type %s"%type(index))ifindex>=self.sizeorindex<-self.size:raiseIndexError("Index %d out of bounds."%index)ifindex<0:index+=self.sizeif(inds.size==0)or(index>inds.item(-1)):returnnp.float64(0.0)insert_index=np.searchsorted(inds,index)row_ind=inds[insert_index]ifrow_ind==index:returnvals[insert_index]returnnp.float64(0.0)def__ne__(self,other:Any)->bool:returnnotself.__eq__(other)def__hash__(self)->int:result=31+self.sizennz=0i=0whilei<len(self.values)andnnz<128:ifself.values[i]!=0:result=31*result+int(self.indices[i])bits=_double_to_long_bits(self.values[i])result=31*result+(bits^(bits>>32))nnz+=1i+=1returnresult
[docs]classVectors:""" Factory methods for working with vectors. Notes ----- Dense vectors are simply represented as NumPy array objects, so there is no need to covert them for use in MLlib. For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users can pass in SciPy's `scipy.sparse` column vectors. """@staticmethod@overloaddefsparse(size:int,__indices:bytes,__values:bytes)->SparseVector:...@staticmethod@overloaddefsparse(size:int,*args:Tuple[int,float])->SparseVector:...@staticmethod@overloaddefsparse(size:int,__indices:Iterable[int],__values:Iterable[float])->SparseVector:...@staticmethod@overloaddefsparse(size:int,__pairs:Iterable[Tuple[int,float]])->SparseVector:...@staticmethod@overloaddefsparse(size:int,__map:Dict[int,float])->SparseVector:...
[docs]@staticmethoddefsparse(size:int,*args:Union[bytes,Tuple[int,float],Iterable[float],Iterable[Tuple[int,float]],Dict[int,float]],)->SparseVector:""" Create a sparse vector, using either a dictionary, a list of (index, value) pairs, or two separate arrays of indices and values (sorted by index). Parameters ---------- size : int Size of the vector. args Non-zero entries, as a dictionary, list of tuples, or two sorted lists containing indices and values. Examples -------- >>> Vectors.sparse(4, {1: 1.0, 3: 5.5}) SparseVector(4, {1: 1.0, 3: 5.5}) >>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)]) SparseVector(4, {1: 1.0, 3: 5.5}) >>> Vectors.sparse(4, [1, 3], [1.0, 5.5]) SparseVector(4, {1: 1.0, 3: 5.5}) """returnSparseVector(size,*args)# type: ignore[arg-type]
[docs]@staticmethoddefdense(*elements:Union[float,bytes,np.ndarray,Iterable[float]])->DenseVector:""" Create a dense vector of 64-bit floats from a Python list or numbers. Examples -------- >>> Vectors.dense([1, 2, 3]) DenseVector([1.0, 2.0, 3.0]) >>> Vectors.dense(1.0, 2.0) DenseVector([1.0, 2.0]) """iflen(elements)==1andnotisinstance(elements[0],(float,int)):# it's list, numpy.array or other iterable object.elements=elements[0]# type: ignore[assignment]returnDenseVector(cast(Iterable[float],elements))
[docs]@staticmethoddeffromML(vec:newlinalg.DenseVector)->DenseVector:""" Convert a vector from the new mllib-local representation. This does NOT copy the data; it copies references. .. versionadded:: 2.0.0 Parameters ---------- vec : :py:class:`pyspark.ml.linalg.Vector` Returns ------- :py:class:`pyspark.mllib.linalg.Vector` """ifisinstance(vec,newlinalg.DenseVector):returnDenseVector(vec.array)elifisinstance(vec,newlinalg.SparseVector):returnSparseVector(vec.size,vec.indices,vec.values)else:raiseTypeError("Unsupported vector type %s"%type(vec))
[docs]@staticmethoddefstringify(vector:Vector)->str:""" Converts a vector into a string, which can be recognized by Vectors.parse(). Examples -------- >>> Vectors.stringify(Vectors.sparse(2, [1], [1.0])) '(2,[1],[1.0])' >>> Vectors.stringify(Vectors.dense([0.0, 1.0])) '[0.0,1.0]' """returnstr(vector)
[docs]@staticmethoddefsquared_distance(v1:Vector,v2:Vector)->np.float64:""" Squared distance between two vectors. a and b can be of type SparseVector, DenseVector, np.ndarray or array.array. Examples -------- >>> a = Vectors.sparse(4, [(0, 1), (3, 4)]) >>> b = Vectors.dense([2, 5, 4, 1]) >>> a.squared_distance(b) 51.0 """v1,v2=_convert_to_vector(v1),_convert_to_vector(v2)returnv1.squared_distance(v2)# type: ignore[attr-defined]
[docs]@staticmethoddefnorm(vector:Vector,p:"NormType")->np.float64:""" Find norm of the given vector. """return_convert_to_vector(vector).norm(p)# type: ignore[attr-defined]
[docs]@staticmethoddefparse(s:str)->Vector:"""Parse a string representation back into the Vector. Examples -------- >>> Vectors.parse('[2,1,2 ]') DenseVector([2.0, 1.0, 2.0]) >>> Vectors.parse(' ( 100, [0], [2])') SparseVector(100, {0: 2.0}) """ifs.find("(")==-1ands.find("[")!=-1:returnDenseVector.parse(s)elifs.find("(")!=-1:returnSparseVector.parse(s)else:raiseValueError("Cannot find tokens '[' or '(' from the input string.")
@staticmethoddef_equals(v1_indices:Union[Sequence[int],np.ndarray],v1_values:Union[Sequence[float],np.ndarray],v2_indices:Union[Sequence[int],np.ndarray],v2_values:Union[Sequence[float],np.ndarray],)->bool:""" Check equality between sparse/dense vectors, v1_indices and v2_indices assume to be strictly increasing. """v1_size=len(v1_values)v2_size=len(v2_values)k1=0k2=0all_equal=Truewhileall_equal:whilek1<v1_sizeandv1_values[k1]==0:k1+=1whilek2<v2_sizeandv2_values[k2]==0:k2+=1ifk1>=v1_sizeork2>=v2_size:returnk1>=v1_sizeandk2>=v2_sizeall_equal=v1_indices[k1]==v2_indices[k2]andv1_values[k1]==v2_values[k2]k1+=1k2+=1returnall_equal
[docs]classMatrix:__UDT__=MatrixUDT()""" Represents a local matrix. """def__init__(self,numRows:int,numCols:int,isTransposed:bool=False)->None:self.numRows=numRowsself.numCols=numColsself.isTransposed=isTransposed
[docs]deftoArray(self)->np.ndarray:""" Returns its elements in a NumPy ndarray. """raiseNotImplementedError
[docs]defasML(self)->newlinalg.Matrix:""" Convert this matrix to the new mllib-local representation. This does NOT copy the data; it copies references. """raiseNotImplementedError
@staticmethoddef_convert_to_array(array_like:Union[bytes,Iterable[float]],dtype:Any)->np.ndarray:""" Convert Matrix attributes which are array-like or buffer to array. """ifisinstance(array_like,bytes):returnnp.frombuffer(array_like,dtype=dtype)returnnp.asarray(array_like,dtype=dtype)
[docs]classDenseMatrix(Matrix):""" Column-major dense matrix. """def__init__(self,numRows:int,numCols:int,values:Union[bytes,Iterable[float]],isTransposed:bool=False,):Matrix.__init__(self,numRows,numCols,isTransposed)values=self._convert_to_array(values,np.float64)assertlen(values)==numRows*numColsself.values=valuesdef__reduce__(self)->Tuple[Type["DenseMatrix"],Tuple[int,int,bytes,int]]:returnDenseMatrix,(self.numRows,self.numCols,self.values.tobytes(),int(self.isTransposed),)def__str__(self)->str:""" Pretty printing of a DenseMatrix Examples -------- >>> dm = DenseMatrix(2, 2, range(4)) >>> print(dm) DenseMatrix([[ 0., 2.], [ 1., 3.]]) >>> dm = DenseMatrix(2, 2, range(4), isTransposed=True) >>> print(dm) DenseMatrix([[ 0., 1.], [ 2., 3.]]) """# Inspired by __repr__ in scipy matrices.array_lines=repr(self.toArray()).splitlines()# We need to adjust six spaces which is the difference in number# of letters between "DenseMatrix" and "array"x="\n".join([(" "*6+line)forlineinarray_lines[1:]])returnarray_lines[0].replace("array","DenseMatrix")+"\n"+xdef__repr__(self)->str:""" Representation of a DenseMatrix Examples -------- >>> dm = DenseMatrix(2, 2, range(4)) >>> dm DenseMatrix(2, 2, [0.0, 1.0, 2.0, 3.0], False) """# If the number of values are less than seventeen then return as it is.# Else return first eight values and last eight values.iflen(self.values)<17:entries=_format_float_list(self.values)else:entries=(_format_float_list(self.values[:8])+["..."]+_format_float_list(self.values[-8:]))return"DenseMatrix({0}, {1}, [{2}], {3})".format(self.numRows,self.numCols,", ".join(entries),self.isTransposed)
[docs]deftoSparse(self)->"SparseMatrix":"""Convert to SparseMatrix"""ifself.isTransposed:values=np.ravel(self.toArray(),order="F")else:values=self.valuesindices=np.nonzero(values)[0]colCounts=np.bincount(indices//self.numRows)colPtrs=np.cumsum(np.hstack((0,colCounts,np.zeros(self.numCols-colCounts.size))))values=values[indices]rowIndices=indices%self.numRowsreturnSparseMatrix(self.numRows,self.numCols,colPtrs,rowIndices,values)
[docs]defasML(self)->newlinalg.DenseMatrix:""" Convert this matrix to the new mllib-local representation. This does NOT copy the data; it copies references. .. versionadded:: 2.0.0 Returns ------- :py:class:`pyspark.ml.linalg.DenseMatrix` """returnnewlinalg.DenseMatrix(self.numRows,self.numCols,self.values,self.isTransposed)
def__getitem__(self,indices:Tuple[int,int])->np.float64:i,j=indicesifi<0ori>=self.numRows:raiseIndexError("Row index %d is out of range [0, %d)"%(i,self.numRows))ifj>=self.numColsorj<0:raiseIndexError("Column index %d is out of range [0, %d)"%(j,self.numCols))ifself.isTransposed:returnself.values[i*self.numCols+j]else:returnself.values[i+j*self.numRows]def__eq__(self,other:Any)->bool:ifself.numRows!=other.numRowsorself.numCols!=other.numCols:returnFalseifisinstance(other,SparseMatrix):returnnp.all(self.toArray()==other.toArray()).tolist()self_values=np.ravel(self.toArray(),order="F")other_values=np.ravel(other.toArray(),order="F")returnnp.all(self_values==other_values).tolist()
[docs]classSparseMatrix(Matrix):"""Sparse Matrix stored in CSC format."""def__init__(self,numRows:int,numCols:int,colPtrs:Union[bytes,Iterable[int]],rowIndices:Union[bytes,Iterable[int]],values:Union[bytes,Iterable[float]],isTransposed:bool=False,)->None:Matrix.__init__(self,numRows,numCols,isTransposed)self.colPtrs=self._convert_to_array(colPtrs,np.int32)self.rowIndices=self._convert_to_array(rowIndices,np.int32)self.values=self._convert_to_array(values,np.float64)ifself.isTransposed:ifself.colPtrs.size!=numRows+1:raiseValueError("Expected colPtrs of size %d, got %d."%(numRows+1,self.colPtrs.size))else:ifself.colPtrs.size!=numCols+1:raiseValueError("Expected colPtrs of size %d, got %d."%(numCols+1,self.colPtrs.size))ifself.rowIndices.size!=self.values.size:raiseValueError("Expected rowIndices of length %d, got %d."%(self.rowIndices.size,self.values.size))def__str__(self)->str:""" Pretty printing of a SparseMatrix Examples -------- >>> sm1 = SparseMatrix(2, 2, [0, 2, 3], [0, 1, 1], [2, 3, 4]) >>> print(sm1) 2 X 2 CSCMatrix (0,0) 2.0 (1,0) 3.0 (1,1) 4.0 >>> sm1 = SparseMatrix(2, 2, [0, 2, 3], [0, 1, 1], [2, 3, 4], True) >>> print(sm1) 2 X 2 CSRMatrix (0,0) 2.0 (0,1) 3.0 (1,1) 4.0 """spstr="{0} X {1} ".format(self.numRows,self.numCols)ifself.isTransposed:spstr+="CSRMatrix\n"else:spstr+="CSCMatrix\n"cur_col=0smlist=[]# Display first 16 values.iflen(self.values)<=16:zipindval=zip(self.rowIndices,self.values)else:zipindval=zip(self.rowIndices[:16],self.values[:16])fori,(rowInd,value)inenumerate(zipindval):ifself.colPtrs[cur_col+1]<=i:cur_col+=1ifself.isTransposed:smlist.append("({0},{1}) {2}".format(cur_col,rowInd,_format_float(value)))else:smlist.append("({0},{1}) {2}".format(rowInd,cur_col,_format_float(value)))spstr+="\n".join(smlist)iflen(self.values)>16:spstr+="\n.."*2returnspstrdef__repr__(self)->str:""" Representation of a SparseMatrix Examples -------- >>> sm1 = SparseMatrix(2, 2, [0, 2, 3], [0, 1, 1], [2, 3, 4]) >>> sm1 SparseMatrix(2, 2, [0, 2, 3], [0, 1, 1], [2.0, 3.0, 4.0], False) """rowIndices=list(self.rowIndices)colPtrs=list(self.colPtrs)iflen(self.values)<=16:values=_format_float_list(self.values)else:values=(_format_float_list(self.values[:8])+["..."]+_format_float_list(self.values[-8:]))rowIndices=rowIndices[:8]+["..."]+rowIndices[-8:]iflen(self.colPtrs)>16:colPtrs=colPtrs[:8]+["..."]+colPtrs[-8:]return"SparseMatrix({0}, {1}, [{2}], [{3}], [{4}], {5})".format(self.numRows,self.numCols,", ".join([str(ptr)forptrincolPtrs]),", ".join([str(ind)forindinrowIndices]),", ".join(values),self.isTransposed,)def__reduce__(self)->Tuple[Type["SparseMatrix"],Tuple[int,int,bytes,bytes,bytes,int]]:returnSparseMatrix,(self.numRows,self.numCols,self.colPtrs.tobytes(),self.rowIndices.tobytes(),self.values.tobytes(),int(self.isTransposed),)def__getitem__(self,indices:Tuple[int,int])->np.float64:i,j=indicesifi<0ori>=self.numRows:raiseIndexError("Row index %d is out of range [0, %d)"%(i,self.numRows))ifj<0orj>=self.numCols:raiseIndexError("Column index %d is out of range [0, %d)"%(j,self.numCols))# If a CSR matrix is given, then the row index should be searched# for in ColPtrs, and the column index should be searched for in the# corresponding slice obtained from rowIndices.ifself.isTransposed:j,i=i,jcolStart=self.colPtrs[j]colEnd=self.colPtrs[j+1]nz=self.rowIndices[colStart:colEnd]ind=np.searchsorted(nz,i)+colStartifind<colEndandself.rowIndices[ind]==i:returnself.values[ind]else:returnnp.float64(0.0)
[docs]deftoArray(self)->np.ndarray:""" Return an numpy.ndarray """A=np.zeros((self.numRows,self.numCols),dtype=np.float64,order="F")forkinrange(self.colPtrs.size-1):startptr=self.colPtrs[k]endptr=self.colPtrs[k+1]ifself.isTransposed:A[k,self.rowIndices[startptr:endptr]]=self.values[startptr:endptr]else:A[self.rowIndices[startptr:endptr],k]=self.values[startptr:endptr]returnA
[docs]defasML(self)->newlinalg.SparseMatrix:""" Convert this matrix to the new mllib-local representation. This does NOT copy the data; it copies references. .. versionadded:: 2.0.0 Returns ------- :py:class:`pyspark.ml.linalg.SparseMatrix` """returnnewlinalg.SparseMatrix(self.numRows,self.numCols,self.colPtrs,self.rowIndices,self.values,self.isTransposed,)
# TODO: More efficient implementation:def__eq__(self,other:Any)->bool:assertisinstance(other,Matrix)returnnp.all(self.toArray()==other.toArray()).tolist()
[docs]@staticmethoddefdense(numRows:int,numCols:int,values:Union[bytes,Iterable[float]])->DenseMatrix:""" Create a DenseMatrix """returnDenseMatrix(numRows,numCols,values)
[docs]@staticmethoddefsparse(numRows:int,numCols:int,colPtrs:Union[bytes,Iterable[int]],rowIndices:Union[bytes,Iterable[int]],values:Union[bytes,Iterable[float]],)->SparseMatrix:""" Create a SparseMatrix """returnSparseMatrix(numRows,numCols,colPtrs,rowIndices,values)
[docs]@staticmethoddeffromML(mat:newlinalg.Matrix)->Matrix:""" Convert a matrix from the new mllib-local representation. This does NOT copy the data; it copies references. .. versionadded:: 2.0.0 Parameters ---------- mat : :py:class:`pyspark.ml.linalg.Matrix` Returns ------- :py:class:`pyspark.mllib.linalg.Matrix` """ifisinstance(mat,newlinalg.DenseMatrix):returnDenseMatrix(mat.numRows,mat.numCols,mat.values,mat.isTransposed)elifisinstance(mat,newlinalg.SparseMatrix):returnSparseMatrix(mat.numRows,mat.numCols,mat.colPtrs,mat.rowIndices,mat.values,mat.isTransposed)else:raiseTypeError("Unsupported matrix type %s"%type(mat))
[docs]classQRDecomposition(Generic[QT,RT]):""" Represents QR factors. """def__init__(self,Q:QT,R:RT)->None:self._Q=Qself._R=R@property# type: ignore[misc]@since("2.0.0")defQ(self)->QT:""" An orthogonal matrix Q in a QR decomposition. May be null if not computed. """returnself._Q@property# type: ignore[misc]@since("2.0.0")defR(self)->RT:""" An upper triangular matrix R in a QR decomposition. """returnself._R