diff --git a/Cekirdekler/Cekirdekler/ClPipeline.cs b/Cekirdekler/Cekirdekler/ClPipeline.cs index 567d0d4..268345a 100644 --- a/Cekirdekler/Cekirdekler/ClPipeline.cs +++ b/Cekirdekler/Cekirdekler/ClPipeline.cs @@ -3252,7 +3252,7 @@ public enum ClTaskType:int TASK_MESSAGE_DEFAULT = 0, /// - /// not implemeneted + /// /// enables single device mode and selects a device for the following tasks until SELECT_END is reached. /// if SELECT_END is quickly enqueued on device, remaining tasks can be enqueued to other devices concurrently /// multiple select groups can run on different devices @@ -3262,7 +3262,7 @@ public enum ClTaskType:int TASK_MESSAGE_DEVICE_SELECT_BEGIN = 1, /// - /// not implemeneted + /// /// deselects device of single device mode if it is enabled /// this does not ensure any synchronization between devices. Other devices can compute other groups of tasks /// SELECT_END must exist between two SELECT_BEGIN. Multiple begin-end ranges must not overlap @@ -3282,8 +3282,10 @@ public enum ClTaskType:int TASK_MESSAGE_GLOBAL_SYNCHRONIZATION_LAST = 8, /// - /// not implemeneted - /// runs this task on all devices when device pool intercepts this(writing results to host should be done by only 1 device) + /// runs this task on all devices when device pool intercepts this + /// must not write to host(array.write or array.writeAll must not be enabled) + /// meant for only initializing all devices' buffers, read-only, even on kernel side + /// /// TASK_MESSAGE_BROADCAST = 16, @@ -4158,9 +4160,27 @@ void produceTasksComputeAtWill() // if single device mode has ended handleSingleDeviceModeEnd(data.task.type); + // if task is to be run on all devices + if ((data.task.type & ClTaskType.TASK_MESSAGE_BROADCAST) > 0) + { + for (int i = 0; i < numDevices; i++) + { + ClPoolTaskIdPair broadcastedTask = new ClPoolTaskIdPair(); + broadcastedTask.id = data.id; + broadcastedTask.serialMode = data.serialMode; + broadcastedTask.deviceIndex = i; + broadcastedTask.task = data.task.duplicate(); + pipe.push(broadcastedTask); + } + } + else + { - // sends to common queue that all devices consume - pipe.push(data); + // sends to common queue, an idle device will consume + pipe.push(data); + } + + // reactivate device threads to process new data wakeDevices(); // if global synchronization_last is taken @@ -4685,7 +4705,7 @@ void consumeTasksComputeAtWill() } - + // getting all tasks quickly to cache if all are prepared for this device only bool tasksForThisDeviceExist = true; int tmpCtr = 0; while(tasksForThisDeviceExist && tmpCtr<20) @@ -4695,7 +4715,7 @@ void consumeTasksComputeAtWill() if (newCacheData != null) { cachePipe.push(newCacheData); - if(newCacheData.deviceIndex==deviceIndex) + if((newCacheData.deviceIndex==deviceIndex) || (newCacheData.serialMode)) { // this is single device mode enabled series of tasks tasksForThisDeviceExist = true; @@ -4758,7 +4778,7 @@ void consumeTasksComputeAtWill() { float probability = (((float)(data.task.totalTasks- data.task.remainingTasks)) / ((float)(data.task.totalTasks + 1))); float testing = (float)rand.NextDouble(); - if ((testing < (probability*probability)) /*|| ((data.task.totalTasks - data.task.remainingTasks) > (data.task.totalTasks - 3))*/) + if ((testing < (probability*probability) || (data.task.remainingTasks <=2 )) /*|| ((data.task.totalTasks - data.task.remainingTasks) > (data.task.totalTasks - 3))*/) { numberCruncher.fineGrainedQueueControl = true; }