diff --git a/changelogs/unreleased/234-pawanpraka1 b/changelogs/unreleased/234-pawanpraka1
new file mode 100644
index 000000000..a8bc44a28
--- /dev/null
+++ b/changelogs/unreleased/234-pawanpraka1
@@ -0,0 +1 @@
+add support for creating the Clone from volume as datasource
diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go
index 69c815551..66816699e 100644
--- a/pkg/driver/controller.go
+++ b/pkg/driver/controller.go
@@ -165,8 +165,56 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
 	return selected, nil
 }
 
-// CreateZFSClone create a clone of zfs volume
-func CreateZFSClone(req *csi.CreateVolumeRequest, snapshot string) (string, error) {
+// CreateVolClone creates the clone from a volume
+func CreateVolClone(req *csi.CreateVolumeRequest, srcVol string) (string, error) {
+	volName := req.GetName()
+	parameters := req.GetParameters()
+	// lower case keys, cf CreateZFSVolume()
+	pool := helpers.GetInsensitiveParameter(&parameters, "poolname")
+	size := getRoundedCapacity(req.GetCapacityRange().RequiredBytes)
+	volsize := strconv.FormatInt(int64(size), 10)
+
+	vol, err := zfs.GetZFSVolume(srcVol)
+	if err != nil {
+		return "", status.Error(codes.NotFound, err.Error())
+	}
+
+	if vol.Spec.PoolName != pool {
+		return "", status.Errorf(codes.Internal,
+			"clone: different pool src pool %s dst pool %s",
+			vol.Spec.PoolName, pool)
+	}
+
+	if vol.Spec.Capacity != volsize {
+		return "", status.Error(codes.Internal, "clone: volume size is not matching")
+	}
+
+	selected := vol.Spec.OwnerNodeID
+
+	labels := map[string]string{zfs.ZFSVolKey: vol.Name}
+
+	// create the clone from the source volume
+
+	volObj, err := volbuilder.NewBuilder().
+		WithName(volName).
+		WithVolumeStatus(zfs.ZFSStatusPending).
+		WithLabels(labels).Build()
+
+	volObj.Spec = vol.Spec
+	// use the snapshot name same as new volname
+	volObj.Spec.SnapName = vol.Name + "@" + volName
+
+	err = zfs.ProvisionVolume(volObj)
+	if err != nil {
+		return "", status.Errorf(codes.Internal,
+			"clone: not able to provision the volume %s", err.Error())
+	}
+
+	return selected, nil
+}
+
+// CreateSnapClone creates the clone from a snapshot
+func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, error) {
 
 	volName := req.GetName()
 	parameters := req.GetParameters()
@@ -243,7 +291,10 @@ func (cs *controller) CreateVolume(
 	if contentSource != nil && contentSource.GetSnapshot() != nil {
 		snapshotID := contentSource.GetSnapshot().GetSnapshotId()
 
-		selected, err = CreateZFSClone(req, snapshotID)
+		selected, err = CreateSnapClone(req, snapshotID)
+	} else if contentSource != nil && contentSource.GetVolume() != nil {
+		srcVol := contentSource.GetVolume().GetVolumeId()
+		selected, err = CreateVolClone(req, srcVol)
 	} else {
 		selected, err = CreateZFSVolume(req)
 	}
diff --git a/pkg/zfs/zfs_util.go b/pkg/zfs/zfs_util.go
index 213394d0b..6b2ed6530 100644
--- a/pkg/zfs/zfs_util.go
+++ b/pkg/zfs/zfs_util.go
@@ -401,6 +401,26 @@ func CreateVolume(vol *apis.ZFSVolume) error {
 func CreateClone(vol *apis.ZFSVolume) error {
 	volume := vol.Spec.PoolName + "/" + vol.Name
 
+	if srcVol, ok := vol.Labels[ZFSVolKey]; ok {
+		// datasource is volume, create the snapshot first
+		snap := &apis.ZFSSnapshot{}
+		snap.Name = vol.Name // use volname as snapname
+		snap.Spec = vol.Spec
+		// add src vol name
+		snap.Labels = map[string]string{ZFSVolKey: srcVol}
+
+		klog.Infof("creating snapshot %s@%s for the clone %s", srcVol, snap.Name, volume)
+
+		err := CreateSnapshot(snap)
+
+		if err != nil {
+			klog.Errorf(
+				"zfs: could not create snapshot for the clone vol %s snap %s err %v", volume, snap.Name, err,
+			)
+			return err
+		}
+	}
+
 	if err := getVolume(volume); err != nil {
 		var args []string
 		args = buildCloneCreateArgs(vol)
@@ -580,6 +600,27 @@ func DestroyVolume(vol *apis.ZFSVolume) error {
 		)
 		return err
 	}
+
+	if srcVol, ok := vol.Labels[ZFSVolKey]; ok {
+		// datasource is volume, delete the dependent snapshot
+		snap := &apis.ZFSSnapshot{}
+		snap.Name = vol.Name // snapname is same as volname
+		snap.Spec = vol.Spec
+		// add src vol name
+		snap.Labels = map[string]string{ZFSVolKey: srcVol}
+
+		klog.Infof("destroying snapshot %s@%s for the clone %s", srcVol, snap.Name, volume)
+
+		err := DestroySnapshot(snap)
+
+		if err != nil {
+			// no need to reconcile as volume has already been deleted
+			klog.Errorf(
+				"zfs: could not destroy snapshot for the clone vol %s snap %s err %v", volume, snap.Name, err,
+			)
+		}
+	}
+
 	klog.Infof("destroyed volume %s", volume)
 
 	return nil