Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[do not merge - discussion ] QueueTasks #32229

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

eileenmcnaughton
Copy link
Contributor

@eileenmcnaughton eileenmcnaughton commented Feb 26, 2025

Overview

@totten I put this up here primarily to chat with you about how we could use some of our experience trying to do 'our stuff' with the queue

I tried out re-writing our dedupe script wrapper into a queue item - this is the first commit. Ulitmately I feel like a 'job' like this should be something that could be easily enabled in core without having to have an extension as the actual action exists as a (v3) api - it's just the tracking of progress that is missing

I also added in the QueueHelper we have been using - this is the same sort of idea - ie different ways of iterating through stuff in batches.

I feel like all the potential of this is just beyond my grasp

Before

What is the old user-interface or technical-contract (as appropriate)?
For optimal clarity, include a concrete example such as a screenshot, GIF (LICEcap, SilentCast), or code-snippet.

After

What changed? What is new old user-interface or technical-contract?
For optimal clarity, include a concrete example such as a screenshot, GIF (LICEcap, SilentCast), or code-snippet.

Technical Details

If the PR involves technical details/changes/considerations which would not be manifest to a casual developer skimming the above sections, please describe the details here.

Comments

Anything else you would like the reviewer to note

Copy link

civibot bot commented Feb 26, 2025

🤖 Thank you for contributing to CiviCRM! ❤️ We will need to test and review this PR. 👷

Introduction for new contributors...
  • If this is your first PR, an admin will greenlight automated testing with the command ok to test or add to whitelist.
  • A series of tests will automatically run. You can see the results at the bottom of this page (if there are any problems, it will include a link to see what went wrong).
  • A demo site will be built where anyone can try out a version of CiviCRM that includes your changes.
  • If this process needs to be repeated, an admin will issue the command test this please to rerun tests and build a new demo site.
  • Before this PR can be merged, it needs to be reviewed. Please keep in mind that reviewers are volunteers, and their response time can vary from a few hours to a few weeks depending on their availability and their knowledge of this particular part of CiviCRM.
  • A great way to speed up this process is to "trade reviews" with someone - find an open PR that you feel able to review, and leave a comment like "I'm reviewing this now, could you please review mine?" (include a link to yours). You don't have to wait for a response to get started (and you don't have to stop at one!) the more you review, the faster this process goes for everyone 😄
  • To ensure that you are credited properly in the final release notes, please add yourself to contributor-key.yml
  • For more information about contributing, see CONTRIBUTING.md.
Quick links for reviewers...

➡️ Online demo of this PR 🔗

@civibot civibot bot added the master label Feb 26, 2025
protected ?int $ruleGroupID = NULL;
protected ?int $minimumContactID = NULL;

public function _run(Result $result) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this to initially create the queue item

* This comes from https://gist.github.com/totten/fa830cfced9bb7a92dea485f5422055a
* & will hopefully be moved to core.
*/
class QueueHelper {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is from the second commit - ie the second line of thinking (it's the only thing in that commit)

'values' => [
'name' => 'batch_merge',
'type' => 'Sql',
'runner' => 'task',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarification -- Setting this runner=>task implies that the queue contains CRM_Queue_Task records. There is an existing handler (CRM_Queue_BAO_Queue::hook_civicrm_queueRun_task) which tries to execute task items.

To create a new kind of record (e.g. basic array), in this context, it would look like:

  1. Declare the queue with 'runner'=>'batch_merge'
  2. In the handler, subscribe to hook_civicrm_queueRun_batch_merge

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@totten your PR that I just merged changed that though....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sure. The implemntation/listener for hook_civicrm_queueRun_task moved from CRM_Queue_BAO_Queue to CRM_Queue_TaskHandler. In either case, the task is already assigned the meaning of "Handle CRM_Queue_Task."

Perhaps #32233 would make it easier to skim+recognize the connection.

}

$result = civicrm_api3('Job', 'process_batch_merge', $mergeParams);
BatchTask::insert(FALSE)
Copy link
Member

@totten totten Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is the mechanism here a kind of infinite self-propagation? Like, you manually call BatchTask::insert() once, and then it will continuously reschedule itself (either... until it crashes... or until you suspend the queue and manually delete the current queue-item)?

And if you call BatchTask::insert() three times (with different parameters), then there would be three of these self-propagating things going at the same time? (e.g. with different values ofgroup_id,rule_group_id,limit,etc).

The overall result reminds me of, say, forking a process in bash with &. You invoke a command three times, and you get three self-perpetuating processes.

api4_batchtask --rule_group_id=100 --limit=50 &
api4_batchtask --rule_group_id=107 --limit=75 &
api4_batchtask --rule_group_id=111 --limit=50 &

Which I guess makes sense -- it sounds like you're coming from an existing script which was written in the bash-job-management paradigm.

Defining long-running/self-propagating things on top of Queues might be quite powerful. I'm not sure what to call that. (Queue, process, thread... these words have existing meanings.... "Queue-based-process-chain" kind of describes it, but it's messyjargon... we don't really use threads in PHP, so I guess we could claim CiviThread... I dunno.) Whatever it's called, it needs management mechanisms (akin to bash's ps -a, jobs, wait, fg, $!, etc).

Maybe that is somewhere we need to go... but I wasn't quite expecting it yet, so just some due-diligence.


These feel like three distinct areas in my head:

  1. Running dedupe on all modified contacts: There are many ways one could implement that (with or without queueing or background processes). No doubt, some ways are better/worse than others.
  2. Executing ScheduledJobs/crons via queues: Whenever the timer hits ("every hour" or "every day" or whatever), put the ScheduledJob into a queue (which enables you distribute the load across some variable #consumer agents)
  3. Spawning processes to execute indefinitely: Using Queues to spawn single-threaded, self-perpetuating process (with some sequence of steps, chunked-out, which gives you stop/resume/retry and load-management).

For the general goal of "run dedupe on all modified contacts", here's the kind of thing I would've expected to see in a naive/bog-standard "queueing" paradigm:

  1. Declare a Queue:
    • Give it a name like dedupe
    • Set the options for concurrency and error-handling to taste (e.g. type=>Sql, batch_limit=>1 to be conservative; or type=>SqlParallel,batch_limit=>50 to be wild).
  2. Feed contact IDs into the queue: Perhaps via...
    • hook_postCommit: Whenever a Contact changes, enqueue a dedupe task; or...
    • hook_cron/ScheduledJob/crontab: Every $x minutes, scan for modified Contacts. Enqueue a dedupe task for each.
  3. Work the queue: Whenever you receive a contact ID, run the (comparatively expensive) dedupe logic and (if appropriate) merge the contacts.

From where I sit, that seems true to the genre.

  • It has virtues like: (a) you can manage concurrency, throttling, and errors as policy-options -- you can try diff options without needing to rewrite and (b) the expensive subtasks are broken out into discrete parts.
  • However, it assumes that "dedupe" is a discrete task (in the same way that "Send an email" or "Send an SMS" or "Generate a report" is a discrete task). And this may not be true.
    • (Maybe the dedupe algorithm performs better when targeting a set of, say, 50 contacts?)
    • (Random aside: for purposes of a background-cleanup-process, I wonder if dedupe-scan should run with relaxed READ UNCOMMITTED policy, and then dedupe-merge should run with stricter REPEATABLE READ or SERIALIZABLE? i.e. don't do any kind of locking/sync until you have a strong indication of a real duplicate)

You've spent a lot of time in the guts of the dedupe code, so I'm guessing there's some reason why you've skipped past the straight/naive form of queuing. But I can only guess at why. Maybe it's better if you walk me from the "straight/naive" to the more nuanced view.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants