Girder Worker with Girder: a missing link / a linking issue

Dear Girder experts,

I’m now interested in girder-worker (using python tasks first). I spent time reading the following document:

I follow the steps indicated in the documentation and tutorial:

  • install girder-worker and rabbitmq-server [done]
  • creating a task [done]
  • getting start tutorial of girder-worker [done]

Question

My objective is to use Girder Worker with Girder (using python tasks first). From my basic understanding, the celeri approach make thing quite simple. But I missed one important information. I got the following error when creating the job in a girder plugin. Could you indicate to what refer the missing link ? Where should I define the getWorkerApiUrl ? Please find below additionnal information regarding the status/location of both girder server and girder worker:

Thanks in advance,
Valéry

async_result = fibonacci.delay(2)
Traceback (most recent call last):
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_worker/app.py", line 55, in girder_before_task_publish
    job = context.create_task_job(
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_worker/context/girder_context.py", line 27, in create_task_job
    job = job_model.createJob(
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_jobs/models/job.py", line 251, in createJob
    job = self.save(job)
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_jobs/models/job.py", line 271, in save
    job = super().save(job, *args, **kwargs)
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder/models/model_base.py", line 506, in save
    events.trigger('model.%s.save.after' % self.name, document)
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder/events.py", line 292, in trigger
    handler(e)
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_worker/girder_plugin/event_handlers.py", line 159, in attachJobInfoSpec
    Job().updateJob(job, otherFields={'jobInfoSpec': jobInfoSpec(job)})
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_worker/girder_plugin/utils.py", line 167, in jobInfoSpec
    'url': '/'.join((getWorkerApiUrl(), 'job', str(job['_id']))),
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_worker/girder_plugin/utils.py", line 51, in getWorkerApiUrl
    return apiUrl or getApiUrl()
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder/api/rest.py", line 84, in getApiUrl
    raise GirderException('Could not determine API root in %s.' % url)
girder.exceptions.GirderException: Could not determine API root in /.

Additionnal information

  • Girder-server and girder worker are both install on the some computer.
  • On the girder-server side, I use the defaut link, the girder worker plugin seems activated (see picture)
  • On the girder-worker side, I used the following comand : celery worker -A girder_worker.app -l info that return


on passe dans le plugin GwTaskPlugin pour importer la tache
/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/celery/backends/amqp.py:65: CPendingDeprecationWarning: 
    The AMQP result backend is scheduled for deprecation in     version 4.0 and removal in version v5.0.     Please use RPC backend or a persistent backend.

  deprecated.warn(
 
 -------------- celery@ihusux001 v4.4.7 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-5.4.0-91-generic-x86_64-with-glibc2.29 2022-01-14 16:52:48
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         girder_worker:0x7fd7ca7858b0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 80 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . girder_worker.docker.tasks.docker_run
  . gw_task_plugin.tasks.example_task
  . gw_task_plugin.tasks.fibonacci
  . gw_task_plugin.tasks.my_plus_task
  . gwexample.analyses.tasks.fibonacci
  . slicer_cli_web.girder_worker_plugin.direct_docker_run.run

[2022-01-14 16:52:50,912: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2022-01-14 16:52:50,922: INFO/MainProcess] mingle: searching for neighbors
[2022-01-14 16:52:51,940: INFO/MainProcess] mingle: all alone
[2022-01-14 16:52:51,964: INFO/MainProcess] celery@ihusux001 ready.
[2022-01-14 16:54:11,066: INFO/MainProcess] Received task: gw_task_plugin.tasks.example_task[3d06956d-b0e7-44c3-a433-f514b43d9434]  
[2022-01-14 16:54:11,069: WARNING/ForkPoolWorker-64] [2022-01-14 16:54:11,069] WARNING: No jobInfoSpec. Setting job_manager to None.
[2022-01-14 16:54:11,069: WARNING/ForkPoolWorker-64] No jobInfoSpec. Setting job_manager to None.
[2022-01-14 16:54:11,107: INFO/ForkPoolWorker-64] Task gw_task_plugin.tasks.example_task[3d06956d-b0e7-44c3-a433-f514b43d9434] succeeded in 0.022846607957035303s: 'hello world'
[2022-01-14 16:54:52,159: INFO/MainProcess] Received task: gw_task_plugin.tasks.fibonacci[93a2d814-5a77-4c9f-8627-ab7f5636aba7]  
[2022-01-14 16:54:52,161: WARNING/ForkPoolWorker-64] [2022-01-14 16:54:52,161] WARNING: No jobInfoSpec. Setting job_manager to None.
[2022-01-14 16:54:52,161: WARNING/ForkPoolWorker-64] No jobInfoSpec. Setting job_manager to None.
[2022-01-14 16:54:52,182: INFO/ForkPoolWorker-64] Task gw_task_plugin.tasks.fibonacci[93a2d814-5a77-4c9f-8627-ab7f5636aba7] succeeded in 0.011903629172593355s: 55
[2022-01-14 16:55:05,128: INFO/MainProcess] Received task: gw_task_plugin.tasks.fibonacci[1828bf21-d78e-451b-93c5-5344deeba107]  
[2022-01-14 16:55:05,130: WARNING/ForkPoolWorker-64] [2022-01-14 16:55:05,130] WARNING: No jobInfoSpec. Setting job_manager to None.
[2022-01-14 16:55:05,130: WARNING/ForkPoolWorker-64] No jobInfoSpec. Setting job_manager to None.
[2022-01-14 16:55:25,266: INFO/ForkPoolWorker-64] Task gw_task_plugin.tasks.fibonacci[1828bf21-d78e-451b-93c5-5344deeba107] succeeded in 20.12778534181416s: 832040
[2022-01-14 17:04:40,151: INFO/MainProcess] Received task: gw_task_plugin.tasks.fibonacci[8c4b0c79-501d-45d0-94f9-4865cde5580f]  
[2022-01-14 17:04:40,153: WARNING/ForkPoolWorker-64] [2022-01-14 17:04:40,153] WARNING: No jobInfoSpec. Setting job_manager to None.
[2022-01-14 17:04:40,153: WARNING/ForkPoolWorker-64] No jobInfoSpec. Setting job_manager to None.
[2022-01-14 17:04:40,167: INFO/ForkPoolWorker-64] Task gw_task_plugin.tasks.fibonacci[8c4b0c79-501d-45d0-94f9-4865cde5580f] succeeded in 0.006332071032375097s: 1

By default, if you start a girder_worker job via a REST call in Girder, it will use the url used to access the rest endpoint for girder_worker to refer back to Girder. However, if you invoke this directly (not in a rest call) or the machine on which grider_worker is running cannot resolve the url’s host and port to reach girder in the same manner as the REST call that started the job, then the worker won’t know how to reach back to Girder. In the Girder plugin settings for the Worker plugin, there is an “Alternative Girder API URL” which can be set – this should be the value needed for girder_worker to reach girder and is typically something like “http://localohost:8080/api/v1”.

– David

Thanks for your help David. I have been able to run the fibonacci jobs inside Girder. I still have some trouble to get the result of my job. Please correct me if I’m wrong.

I can either run:

async_result = fibonacci.delay(26)
    
# Print the result of fib call
print(async_result.get())

but in this case I lose the interesting part with the asynchronous part. To overcome this limitation, once the job is launched I need to use the mecanism with GirderUploadToItem (explained in girder_worker/TRANSITION.md at master · girder/girder_worker · GitHub). Unfortunately, the task is running without the GirderUploadToItem while it returns an error message with. Could you suggest why. In case, I indicate the relevant code/log on.

Additionally, what would be the most appropriate way to retrieve the log and status during the task (like simple print) ?

Thanks
Best
Valéry


Producer girder server inside a plugin

dict_with_file_information['itemId']  

return

ObjectId('61eee9b6d255641bb0bf0dd5')
def call_girder_worker_outpath(number, item_id):
   #async_result = fibonacci.delay(  number,  girder_job_title="A Custom Job Title")
    # Create a token with permissions of the current user.
   token = Token().createToken(user=getCurrentUser()) 

   async_result =make_matplotlib.delay(99,
    girder_job_title='Upload images Job',
	girder_job_type='my_custom_type',
	girder_job_public=True,
        girder_client_token=token['_id'], 
         girder_result_hooks=[
		GirderUploadToItem(
            str(item_id),
            delete_file=True
        ),
	])

   return async_result.job

Consumer side task:

@app.task
@argument( 'n', types.Integer, min=1)
def make_matplotlib(n):
    """Make a matplotlib figure."""
    print(n)
    output_path='/tmp/figure_1_stream_angle_all_co_all_direction_0_zoom_view_plane_0_0000.png'
    return output_path 

error message

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/hello_world/__init__.py", line 267, in call_girder_worker_outpath
    GirderUploadToItem(
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_worker_utils/transforms/girder_io.py", line 140, in __init__
    super(GirderUploadToItem, self).__init__(**kwargs)
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_worker_utils/transforms/girder_io.py", line 44, in __init__
    token = getCurrentToken()['_id']
TypeError: 'NoneType' object is not subscriptable

Hi,

I made some small progress. Jobs are now nicely running on my girder-worker. The job are starting from a plugin in girder server. I’m now reading the celery documentation.

Unfortunately, I’m completely stuck with the “girder_worker_utils.transforms.girder_io”. Either for input or output. A typical error is below localized in the .init of the class of GirderClientTransform/girder_io.py. It looks like I’m not recognized.

This is not critical for seeding the input of a job, I used the exact path but I have not idea how to get the result without the fonction GirderUploadToItem.

Thanks in advance for your help.

task on the girder worker

@app.task(bind=True)
def read_nifti_file_simple_task(self, filename):
    with open(filename, 'rb') as f:
        rr = f.read()
        bb = BytesIO(rr)
        fh = FileHolder(fileobj=bb)
        nib_image = nib.Nifti1Image.from_file_map({'header': fh, 'image': fh}) 
    
    affine=nib_image.get_affine()
    print(affine)
   
    return 'hello world, I read the file' 

1) with the exact path


path_for_task=dict_with_file_information['path']  
full_path='/home/valeryozenne/Dev/Girder-Magasin/' + path_for_task
call_girder_worker_read_nifti_simple(full_path, creator)

it returns

[2022-02-04 16:04:54,450: INFO/MainProcess] Received task: gwexample.analyses.tasks.read_nifti_file_simple_task[733052b4-8024-442b-9cce-39f160ab7c5a]  
[2022-02-04 16:05:12,172: WARNING/ForkPoolWorker-1] [[ 1.56000003e-01  0.00000000e+00 -0.00000000e+00 -0.00000000e+00]
 [ 0.00000000e+00  1.56000003e-01 -0.00000000e+00  4.08768699e-07]
 [ 0.00000000e+00  0.00000000e+00  1.40000001e-01  7.56978977e-08]
 [ 0.00000000e+00  0.00000000e+00  0.00000000e+00  1.00000000e+00]]
[2022-02-04 16:05:12,262: INFO/ForkPoolWorker-1] Task gwexample.analyses.tasks.read_nifti_file_simple_task[733052b4-8024-442b-9cce-39f160ab7c5a] succeeded in 17.7387371819932s: 'hello world, I read the file'

2) with girderIO

file_id=dict_with_file_information['_id']        
call_girder_worker_read_nifti_simple(GirderFileId(file['_id']), creator)  

it failed in the plugin

Traceback (most recent call last):
  File "<string>", line 2, in <module>
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_worker_utils/transforms/girder_io.py", line 65, in __init__
    super(GirderFileId, self).__init__(**kwargs)
  File "/home/valeryozenne/Dev/Girder/test-girder/lib/python3.8/site-packages/girder_worker_utils/transforms/girder_io.py", line 44, in __init__
    token = getCurrentToken()['_id']
TypeError: 'NoneType' object is not subscriptable

Additional information

def call_girder_worker_read_nifti_simple(full_path, creator):
   #async_result = fibonacci.delay(  number,  girder_job_title="A Custom Job Title")
    # Create a token with permissions of the current user.
   token = Token().createToken(user=creator) 

   async_result = read_nifti_file_simple_task.delay(full_path, 
                 girder_job_title='nifti Job', 
                 	girder_job_type='my_nifti_type',	
                     girder_job_public=True, 
                     girder_client_token=token['_id']
                     )

   return async_result.job

creator return

{'_id': ObjectId('61962758ed...e985d01e'), 'login': 'admin', 'email': 'xxxxxxxx@xxxxxxxxxxx', 'firstName': 'valery', 'lastName': 'ozenne', 'created': datetime.datetime(20...44, 55000), 'emailVerified': True, 'status': 'enabled', 'admin': True, 'size': 0, 'groups': [], 'groupInvites': [], 'salt': '$2b$12$34Hq/TVY.8Y5w...llflkNHVhm', 'public': True, ...}

Hi,

It took some times but we finally find a way to solve this issue. It is related to the binding event.

GirderFileId(str(file['_id']))

returns the mentioned error above if the binding event is

events.bind('data.process','data', _HandlerFunction) 

While it succeed if we use:

events.bind('model.file.save.after', 'after', _HandlerFunction) 

This also solve the issue to send back the result of the job in girder worker to the girder database via

[...] GirderUploadToItem (str(item_id)) [...]

I would be interested to know the logic behind ?
Thanks in advance,
Valéry