Hacker Newsnew | past | comments | ask | show | jobs | submitlogin
Show HN: WakaQ - a Python distributed task queue (wakatime.com)
147 points by welder on Sept 5, 2022 | hide | past | favorite | 68 comments


The project looks nice, and I will certainly follow it.

But the author expresses way too much confidence in the description of the project. It has no test, no type hints, no pluggable error handling, not fall back when the redis instance dies, and has been used in prod by apparently a single entity.

Stating "WakaQ is stable and ready to use in production" is irresponsible, as a task queue is critical infrastructure and this could lure devs into thinking this code is way more mature than it is.

Of course, one should do due diligences when evaluation a new part of the stack. I also appreciate the enthousiasm of the article, and I'm sure the code already procudes many benefits for the author.

But no, a asynchronous execution system dealing with priority, resiliance and network messages, that have existed for a month, is in no way stable and ready. It's nascent and promising.


> Stating "WakaQ is stable and ready to use in production" is irresponsible

I've changed it to "WakaQ is still a new project, so use at your own risk."


Nice work on this! I’ve looked into task queue libraries for Node and Java in the past. Yours looks straight-forward. A few questions:

1. What’s the error handling strategy for when a task/payload fails?

2. How exactly do delayed tasks work? For example, are you delaying execution until say 10mins later? How do you process delayed tasks in sequential timed order?

3. What kind of metrics/stats are available?

4. Is there a way pause and resume or is this the same as start and stop?

Congrats.


1. That's the part I'm not happy with, but currently I use the `@wakaq.after_worker_started` decorator to setup an error logging handler in each worker. It outputs to a file that gets aggregated for error reporting, but without examples most people wouldn't know to do that. Here's the code: https://gist.github.com/alanhamlett/365d48276ac054ae75e59525...

2. Delayed tasks are added to a Redis sorted set, where the eta datetime when task should run is the score. Then the sorted set is queried for all items/tasks between zero and the current time. That returns eta tasks which are ready to run. Those tasks are added to their equivalent non-eta queue at the front and executed by the next worker. Eta tasks might not run right at the eta datetime, but they shouldn't run too early if all worker machines have clocks synced with time servers.

3. wakaq-info prints all queues with counts of pending tasks and pending eta tasks. Throughput isn't printed and has to be calculated by sampling wakaq-info at least twice. You can also import info from wakaq.utils to use it from a Python script.

4. No built-in way to pause. I pause by sending a broadcast task to all workers, which sets exclude_queues to all queues and then restarts the worker. Then it only listens for broadcast tasks and can be turned back on with another broadcast task.

> Congrats

Thanks!


Very nice and ambitious project. Did you compare vs Dramatiq? It was created to overcome same shortcomings of Celery.


Dramatiq is easy to daemonize which I like a about a lot but the schedule feature is not there and that's big drawback. I had to use my custom open source scheduler to schedule a function. https://github.com/rajasimon/beatserver


Periodiq is a nice scheduler, which works well with Dramatiq


I was initially excited when Dramatiq was released, but I think the GNU license prohibits using it for commercial projects?

Also, I really wanted something simple and easy to maintain after working around so many bugs. Dramatiq isn't as complicated as Celery but it still has features I don't need.


It’s LGPL, which is usually fine if you’re just using it as-is.


Maybe I should have used Dramatiq instead of building my own, but it's already done. I'm surprised how well WakaQ is working in production actually. If any issues come up they should be faster to fix than making PRs to Dramatiq.


There’s nothing inherently wrong with doing it yourself if you’re consciously aware of the costs and benefits. Promise us you’ll do a blog post in a year talking about it, though! :)


You are running this in production? I had a look at the github repo and did not find any tests. How are you QAing the project?


I built a local test app that does have some integration tests, and then I just went all in on production. That's why I was surprised it worked so well. I was ready to lose a few nights sleep debugging it in prod, but thankfully didn't have to.


Thanks for honest answer. I hope you enjoyed the development of this software. I will use it the next time I need a queue.


I love that someone else also thinks that Celery has grown too big and too buggy.

It would be awesome if Airflow starts using this.


i made one as well called SAQ!

https://github.com/tobymao/saq

Looks like you use BLPOP, but there could be race conditions or lost jobs if your workers crash, check out BLMOVE and RPOPLPUSH.


> i made one as well called SAQ! https://github.com/tobymao/saq

Awesome! I'll check out your implementation.

> Looks like you use BLPOP, but there could be race conditions or lost jobs if your workers crash, check out BLMOVE and RPOPLPUSH.

I prefer ack-first (blpop) and designing the tasks to expect some amount of errors.


I guess this should be on the FAQ


In case anyone is after a non-distributed task queue for simple cases, I've used Huey above SQLite with ease.


Since Redis is the data store, if your Redis instance goes away, do all of your tasks go away too?


This is still a pretty common misconception (and one that I held until recently when looking at Redis configuration options again), but Redis does have durable operating modes to reload the datastore after going offline (which do significantly trade off speed for durability). If your task queue requires a high enough volume of workload that running Redis in a durable fashion is insufficient, then it's probably time to think about a different queuing system than WakaQ.


I'm talking about if the VM/Instance/Disk goes away. It looks like Redis is a single point of failure unless you run Redis in a redundant way, but skimming the code it looks like that is at best left as an exercise to the reader. ie. Redundancy in the data store doesn't appear to be handled by this code.


Correct, this setup wouldn't survive chaos monkey.


Then I wouldn't call it distributed, since it's centralized on redis.

Otherwised good product.


Goes away like you delete your Redis VM or restarting the Redis server? Redis persists to disk so your tasks aren't lost.


Like the VM/Instance/Machine dies with its disk.


Yes, I don't know any distributed task queue that stays online or backs up tasks when the broker dies. Maybe one could be built on CockroachDB? My implementation is designed to process tasks fast and if the Redis server dies (hasn't happened in 10 years) you just boot up a new one. I try to design my tasks to expect some failures and be able to re-run safely.


RabbitMQ and SQS both have redundancy built in.

There's nothing wrong with the trade off you made, but when I read distributed I assumed that you meant the task broker was distributed and therefore redundant, not the consumers.


Sorry about the confusion. Btw, we've met in person once in Mountain View some years back. Nice to see you here :)


Is it at-most once delivery? how do you deal with crashes?


WakaQ re-forks workers that crash, but normally a crash means an exception was raised in application task code.

I do a combination of a few things to handle exceptions in tasks:

1. `@wakaq.task(max_retries=X)` auto retries the task on soft timeouts, which are usually intermittent and faster the second time the task runs

2. use error handling to track task failure rates https://gist.github.com/alanhamlett/365d48276ac054ae75e59525...

3. build tasks to expect some failures by being safe to re-run without, for ex: sending an email twice. this usually means exclusive locking in tasks or keeping counters of recent task runs or task progress somewhere

4. try catch blocks inside tasks, which rollback and re-enqueue the same task to retry later

Basically I handle all crashes at the application task level, not in WakaQ.


Thank you for explaining! I think this should part of the README. I am going through the code and loving the simplicity of it!


Is there any communication between workers? Say I start two identical workers that have the same schedules set up. Will it run the scheduled tasks twice?


> Say I start two identical workers that have the same schedules set up.

Workers don't run the scheduled tasks, only a scheduler does. You should start only one scheduler but as many workers as you want. wakaq-scheduler and wakaq-worker are the command line interfaces.

> Is there any communication between workers?

Someone I know is working on a solution for this as an open source project. I don't have the link off hand but I'll send him your comment.


Nice project!

I'm one of TaskTiger's current maintainers. Just curious if any specific features are missing (or how it compares in benchmarks)?


When workers claim tasks, TaskTiger moves tasks to a new queue instead of popping them like WakaQ. That means if your task fails with WakaQ then it's just gone.

> how it compares in benchmarks

WakaQ processes many tasks per fork, while I think TaskTiger only processes one? That should make WakaQ slightly faster, but it still depends on too many factors to know without benchmarks.

I actually benchmarked TaskTiger in production a long time ago against Celery. From memory, I think it was slower than Celery. I can't find those benchmarks and even if I could, WakaTime's scale is bigger now so not sure how useful they would be. If you create some benchmarks would love to see them.


> TaskTiger only processes one

Generally yes, though it also supports "batching" tasks where tasks are `.delay`'d normally but the task is given batches of N sets of args at a time and the task's code has to understand batching logic.

:+1: I'd definitely believe the task handling code is slower than celery. Honestly I haven't personally done too much optimization work on it (though perhaps previous maintainers have, I'm not sure). Performance of TaskTiger itself hasn't really been problematic at all for us in production with hundreds of workers. Usually it's been external things (like databases, third party API's, etc) that impact our task throughput the most. Or wanting to shift the architecture entirely to avoid using memory-bound Redis as a queue with an overflow risk.


> Or wanting to shift the architecture entirely to avoid using memory-bound Redis as a queue with an overflow risk.

I wanted to use SSDB[1] instead of Redis for that reason, but it doesn't support the necessary data structures.

[1] https://github.com/ideawu/ssdb


Any link to architecture of it?


https://github.com/wakatime/wakaq/blob/main/wakaq/__init__.p...

and

https://github.com/wakatime/wakaq/blob/main/wakaq/worker.py

is the meat of it. The blog post talks about the Redis data structures used, and there's not much to it beyond that.


Why not just use Redis streams for the queue over the Redis list impl.?


New: Streams are append only, but I push eta tasks onto the front of a queue when their time is ready to run. Looks like streams could work and might even be better suited instead of lists, but I would have to figure out some way to run eta tasks before other pending tasks in a queue.

Old when I confused streams with pubsub: Streams aren't durable... a worker goes offline for a second and all of a sudden you lost all your pending tasks. Redis lists are durable, so you can add tasks first then boot up a worker later and the worker will process those waiting tasks. With streams, the worker wouldn't see any pending tasks when it boots.


AFAIK, streams are treated as any Redis data type and stored on disk making them durable. Redis even allows you to use an append only file and the database snapshot for persistence/durability with differing levels of requirements (eg. every operation, every n seconds, never). Though I'm not sure if the streams are handled the same in terms of high availability though...

Also while plain XREAD doesn't give you pending tasks, using consumer groups and look at XREADGROUP (specifically using ID = 0 to get all pending tasks) should get you all pending tasks before continuing onto unseen tasks. There is also XCLAIM and XAUTOCLAIM which you can filter the pending tasks by how long they have gone unacknowledged for and have another worker claim the tasks.


Oh sorry, somehow I thought we were talking about pubsub.


It is a good question. Maybe because they aren't conceptually as simple as lists and require recent versions of redis. And maybe because xstream is persisting which means they use the disk and lists are not (unless you have incremental dumps turned on)


Why not just use a simple FTP server with curlftps to replace Dropbox then? ;)

But honestly, it seems like you did not read the article at all. That’s seems to be exactly what they do for the queue part, but you still need to manage the workers executing the tasks, lifecycle management and interfacing/serialization/deserialization for the tasks and results


From the guidelines:

> Please don't comment on whether someone read an article.


I really like the simplicity (i.e. readability) of the codebase, nice!


I really like the overall design and ethos, but a new project without typehints, pydoc, and at least some tests just isn't the kind of thing I'd bring into my stack. Hope it gets a little bit of polish!


It does have some typehints... but I don't like Python types when they require importing the object only for the purpose of types. I only import objects when they're actually used in the non-type code, and I don't like to use `if typing.TYPE_CHECKING`.

These days I use Go when I need types.


Why is this a problem? When would you reference the type but not actually need the type? If you're returning the type from some method in your API, but not instantiating it there?


Yes, if you're accepting the type as a param or returning the type. It happens very frequently, and without moving those imports behind "if typing.TYPE_CHECKING", you constantly run into cyclic imports.

https://stackoverflow.com/questions/39740632/python-type-hin...


It happens very frequently if you are not very good at separating your data model from logic.


Why would there exist a way in Python to conditionally import types, for the purpose of preventing cyclic imports, if cyclic imports weren't a problem?

Your comment makes it seem like you haven't experienced Python types enough, or you wouldn't think it was so easy.


> Your comment makes it seem like you haven't experienced Python types enough, or you wouldn't think it was so easy.

Oh trust me I did and I constantly slap on the wrists juniors who over-complicate their solutions to the problem :)

> Why would there exist a way in Python to conditionally import types, for the purpose of preventing cyclic imports, if cyclic imports weren't a problem?

Because it's easier to understand than the solution to cyclic imports without conditional imports.


I understand the idea of moving "iteratively" etc, but this is a major thing for me when projects are mentioned as "only took a week to get this all done" yet a lot of commonly accepted best practice is left by the way side.


Tests? We don't need no stinking tests! [1] I didn't plan to open source this so quickly, if ever. I'm just happy it's working for WakaTime. The makefile has plans for pytest, but honestly my motivation for tests has waned after fixing the production issues with Celery.

[1] https://en.wikipedia.org/wiki/Stinking_badges


well theres a cold start problem here. your bar for adoption is understandable, yet OP is probably also seeking substantive feedback on the solution and the problem space. the top hn comments now provide neither. this discourages future posts by people testing the waters. acceptable individually but on aggregate the community loses out (eg imagine if we judged segment and sentry at launch with these standards).


Seems more like a hobby project than an attempt at a serious production-grade library. But you never know, it might become one some day!


It's being used in production at https://wakatime.com and performing better than Celery was, but yes it's an internal project that was open sourced early stage.


You are pretty damn brave to build your own distributed task queue and put it into production without tests!


I built a local test app that does have some integration tests, and then I just went all in on production. That's why I was surprised it worked so well. I was ready to lose a few nights sleep debugging it in prod, but thankfully didn't have to.


They are not brave, they are just fixing all issues manually whenever a customer calls them about it.


Ah yes, "hobby projects" you don't want to go paying those any attention ... https://en.wikipedia.org/wiki/History_of_Linux#The_creation_... ...


What does "distributed" mean in this context?

> Each queue is implemented using a Redis list.

That seems to imply that queue itself is not distributed.

But maybe "distributed" refers to the fact that workers are running on multiple machines? (Though that is a bit confusing to me; sort of like calling Postgres a distributed database because the clients are on multiple machines.)


Distributed means a central broker (Redis) and many distributed workers (multi-process and multi-machine). It's the term used by Celery/RQ/Huey and others.


modern programming languages should have enough concurrency primitives for building such systems. the python situation is out of control. we have bazillion of libraries and packages doing the same thing. with slight variations.

this very effort is deserves praising. but i have seen it happen many times before. dramatiq, huey, rq, schedule, etc. all one needs to do is wait for those pesky corner cases to start piling up. and maintainers being unable to solve them. or not solve them fast enough.

we need primitives, building blocks, to build such systems when needed. we need more control.

i like what golang does.


Language primitives aren't a substitute for a distributed queue service. You want stuff like persistence, scale-out, and retries if a node falls over and doesn't come back.

Something like Erlang's OTP might have suitable primitives, but only because it's already a distributed system.




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: