EMPIRE DA  v1.9.1
Data assimilation codes using EMPIRE communication
 All Classes Files Functions Variables Pages
comms.f90
Go to the documentation of this file.
1 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
2 !!! Time-stamp: <2016-10-18 15:29:44 pbrowne>
3 !!!
4 !!! Module and subroutine to intitalise EMPIRE coupling to models
5 !!! Copyright (C) 2014 Philip A. Browne
6 !!!
7 !!! This program is free software: you can redistribute it and/or modify
8 !!! it under the terms of the GNU General Public License as published by
9 !!! the Free Software Foundation, either version 3 of the License, or
10 !!! (at your option) any later version.
11 !!!
12 !!! This program is distributed in the hope that it will be useful,
13 !!! but WITHOUT ANY WARRANTY; without even the implied warranty of
14 !!! MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 !!! GNU General Public License for more details.
16 !!!
17 !!! You should have received a copy of the GNU General Public License
18 !!! along with this program. If not, see <http://www.gnu.org/licenses/>.
19 !!!
20 !!! Email: p.browne @ reading.ac.uk
21 !!! Mail: School of Mathematical and Physical Sciences,
22 !!! University of Reading,
23 !!! Reading, UK
24 !!! RG6 6BB
25 !!!
26 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
27 
28 
56 
57 module comms
58  use output_empire, only : emp_e
60 
61  integer :: cpl_mpi_comm
63  integer :: world_rank
64  integer :: cpl_rank
65  integer :: nproc
66  integer :: pf_mpi_comm
67  integer :: pfrank
68  integer :: npfs
69  integer, allocatable, dimension(:) :: gblcount
71  integer, allocatable, dimension(:) :: gbldisp
74  integer :: nens
75  integer :: cnt
77  integer, allocatable, dimension(:) :: particles
79  integer, allocatable, dimension(:) :: cpl_mpi_comms
81  integer, allocatable, dimension(:) :: state_dims
83  integer, allocatable, dimension(:) :: state_displacements
84 
86  integer, allocatable, dimension(:) :: obs_dims
88  integer, allocatable, dimension(:) :: obs_displacements
89 
91  integer :: mdl_num_proc
93  integer :: pf_member_comm
96  integer :: pf_ens_comm
99  integer :: pf_ens_rank
100  integer :: pf_ens_size
101  integer :: pf_member_rank
103  integer :: pf_member_size
105 contains
106 
107  subroutine allocate_data
108 
109  implicit none
110 
111  end subroutine allocate_data
112 
113  subroutine deallocate_data
114  implicit none
115 
116  end subroutine deallocate_data
117 
119  subroutine initialise_mpi
120  implicit none
121 
122  select case(comm_version)
123  case(0)
125  case(1)
126  call initialise_mpi_v1
127  case(2)
128  call initialise_mpi_v2
129  case(3)
130  call initialise_mpi_v3
131  case(4)
132  call initialise_mpi_v4
133  case(5)
134  call initialise_mpi_v5
135  case default
136  write(emp_e,*) 'ERROR: comm_version ',comm_version,' not implemente&
137  &d.'
138  write(emp_e,*) 'STOPPING.'
139  stop '-6'
140  end select
141 
142  end subroutine initialise_mpi
143 
146  subroutine initialise_mpi_v1
147 
148  use pf_control
149  implicit none
150  include 'mpif.h'
151 
152  integer :: mpi_err
153  integer :: couple_colour
154  integer :: myrank
155  integer :: i
156  integer :: da,count
157  integer :: pf_colour
158  integer :: world_size
159 
160  if(comm_version .eq. 2) then
161  call initialise_mpi_v2
162  return
163  end if
164 
165  pf_colour = 10000
166  couple_colour=9999
167  call mpi_init(mpi_err)
168 
169  da = 1
170  call mpi_comm_rank(mpi_comm_world,world_rank, mpi_err)
171  call mpi_comm_size(mpi_comm_world,world_size, mpi_err)
172  call mpi_comm_split(mpi_comm_world,da, world_rank, pf_mpi_comm, mpi_err)
173  call mpi_comm_rank(pf_mpi_comm, pfrank, mpi_err)
174  call mpi_comm_size(pf_mpi_comm, npfs, mpi_err)
175  call mpi_comm_split(mpi_comm_world,couple_colour,world_size,cpl_mpi_comm,mpi_err)
176  call mpi_comm_rank(cpl_mpi_comm, myrank, mpi_err)
177  call mpi_comm_size(cpl_mpi_comm, nens, mpi_err)
178 
179  nens = nens-npfs
180  print*,'DA'
181  print*,'nens = ',nens
182  print*,'npfs = ',npfs
183 
184 
185  !lets find the particles:
186 
187  count = ceiling(real((myrank-nens+1)*nens)/real(npfs)) -&
188  & ceiling(real((myrank-nens)*nens)/real(npfs))
189 
190  allocate(pf%particles(count))
191  allocate( particles(count))
192 
193  pf%particles = (/ (i, i = ceiling(real((myrank-nens)*nens)&
194  &/real(npfs)),&
195  ceiling(real((myrank-nens+1)*nens)/real(npfs))-1) /)+1
196  particles = pf%particles
197 
198  allocate(gblcount(npfs))
199  allocate(gbldisp(npfs))
200 
201 
202  call mpi_allgather(count,1,mpi_integer,gblcount,1,mpi_integer&
203  &,pf_mpi_comm,mpi_err)
204 
205 
206  gbldisp = 0
207  if(npfs .gt. 1) then
208  do i = 2,npfs
209  gbldisp(i) = gbldisp(i-1) + gblcount(i-1)
210  end do
211  end if
212  pf%count = count
213  cnt = count
214 
215  pf%nens = nens
216  print*,'PF_rank = ',pfrank,' and I own particles ',pf%particles
217 
218  pf_ens_comm=pf_mpi_comm
219  pf_ens_size=npfs
220  pf_ens_rank=pfrank
221  pf_member_rank=0
222  pf_member_size=1
223  end subroutine initialise_mpi_v1
224 
225 
227  subroutine initialise_mpi_v2
228  use pf_control
229  use sizes
230  implicit none
231  include 'mpif.h'
232 
233  integer :: mpi_err
234  integer :: world_size
235  integer,parameter :: da=1
236  integer, parameter :: rk = kind(1.0d0)
237  integer :: i
238  integer :: mdl_procs
239  integer :: first_ptcl
240  integer :: final_ptcl
241  integer :: tmp_cpl_comm
242 
243 
244  call mpi_init(mpi_err)
245  call mpi_comm_rank(mpi_comm_world,world_rank,mpi_err)
246  call mpi_comm_size(mpi_comm_world,world_size,mpi_err)
247  print*,'EMPIRE: rank = ',world_rank,' on mpi_comm_world which has size ',world_size
248 
249 
250  !get the number of processes per model
251  call mpi_allreduce(0,mdl_num_proc,1,mpi_integer,mpi_max&
252  &,mpi_comm_world,mpi_err)
253 
254  if(mdl_num_proc .lt. 1) then
255  write(emp_e,*) 'EMPIRE COMMS v2 ERROR: mdl_num_proc < 1'
256  write(emp_e,*) 'mdl_num_proc = ',mdl_num_proc
257  write(emp_e,*) 'THIS SUGGESTS YOU HAVE NOT LINKED TO A MODEL. STOP.'
258  stop
259  else
260  print*,'mdl_num_proc = ',mdl_num_proc
261  end if
262 
263  !split into models and da processes. create pf_mpi_comm
264  call mpi_comm_split(mpi_comm_world,da,world_rank,pf_mpi_comm,mpi_err)
265  call mpi_comm_size(pf_mpi_comm,npfs,mpi_err)
266  call mpi_comm_rank(pf_mpi_comm,pfrank,mpi_err)
267 
268  !compute number of model processes
269  mdl_procs = world_size-npfs
270  print*,'npfs = ',npfs
271 
272  print*,'mdl_procs = ',mdl_procs
273 
274 
275  !compute number of ensemble members
276  nens = mdl_procs/mdl_num_proc
277  print*,'nens = ',nens
278 
279 
280  !compute range of particles that this mpi process communicates with
281  first_ptcl = ceiling(real(pfrank)*real(nens)/real(npfs))
282  final_ptcl = ceiling(real(pfrank+1)*real(nens)/real(npfs))-1
283  particles = (/ (i, i = first_ptcl,final_ptcl) /)
284  print*,'range of particles = ',first_ptcl,final_ptcl
285 
286 
287  !create a temporary communicator with all associated model processes
288  call mpi_comm_split(mpi_comm_world,pfrank,world_size+pfrank&
289  &,tmp_cpl_comm,mpi_err)
290  print*,'split and created tmp_cpl_comm'
291 
292 
293  ! count the number of particles associated with this process
294  cnt = final_ptcl-first_ptcl+1
295  if(cnt .lt. 1) then
296  write(emp_e,*) 'EMPIRE ERROR: YOU HAVE LAUNCHED MORE EMPIRE DA PROCESSES'
297  write(emp_e,*) 'EMPIRE ERROR: THAN MODELS. I AM REDUDANT AND STOPPING.'
298  write(emp_e,*) 'EMPIRE ERROR: RECONSIDER HOW YOU EXECUTE NEXT TIME. xx'
299  stop
300  end if
301 
302 
303  !allocate a communicator for each ensemble member
304  allocate(cpl_mpi_comms(cnt))
305 
306 
307  !split the temporary communicator into individual ones for each
308  !ensemble member
309  do i = 1,cnt
310  call mpi_comm_split(tmp_cpl_comm,1,world_size,cpl_mpi_comms(i)&
311  &,mpi_err)
312  write(*,'(A,i3.3,A)') 'created cpl_mpi_comms(',i,')'
313  end do
314 
315 
316  !the rank of this mpi process each of cpl_mpi_comms(:) is the highest
317  cpl_rank = mdl_num_proc
318 
319 
320  !free up the temporary communicator
321  call mpi_comm_free(tmp_cpl_comm,mpi_err)
322 
323 
324  !allocate space to get the sizes from each model process
325  allocate(state_dims(mdl_num_proc+1))
326  allocate(state_displacements(mdl_num_proc+1))
327 
328 
329  !set the sizes to zero, specifically the final entry in the arrays
330  state_dims = 0
331  state_dim = 0
332 
333 
334  print*,'doing a gather on cpl_mpi_comm'
335  !for each ensemble member, gather the number of variables stored
336  !on each process
337  do i = 1,cnt
338  call mpi_gather(state_dim,1,mpi_integer,state_dims&
339  &,1,mpi_integer,cpl_rank,cpl_mpi_comms(i),mpi_err)
340  end do
341  print*,'state_dims = ',state_dims
342 
343 
344  !compute the relevant displacements for the gathers
345  state_displacements = 0
346  do i = 2,mdl_num_proc
347  state_displacements(i:) = state_displacements(i:) + state_dims(i-1)
348  end do
349  print*,'state_displacements = ',state_displacements
350 
351 
352  !compute the total size of the state
353  state_dim = sum(state_dims)
354  print*,'total state_dim = ',state_dim
355 
356 
357  !compute counts and displacements of particles associated with da
358  !processes
359  allocate(gblcount(npfs))
360  allocate(gbldisp(npfs))
361 
362 
363  call mpi_allgather(cnt,1,mpi_integer,gblcount,1,mpi_integer&
364  &,pf_mpi_comm,mpi_err)
365  if(mpi_err .eq. 0) then
366  print*,'mpi_allgather successful: gblcount known on all da proc&
367  &esses'
368  print*,'gblcount = ',gblcount
369  else
370  print*,'mpi_allgather unsucessful'
371  end if
372 
373 
374  gbldisp = 0
375  if(npfs .gt. 1) then
376  do i = 2,npfs
377  gbldisp(i) = gbldisp(i-1) + gblcount(i-1)
378  end do
379  end if
380 
381 
382 
383 
384 
385  pf_ens_comm=pf_mpi_comm
386  pf_ens_size=npfs
387  pf_ens_rank=pfrank
388  pf_member_rank=0
389  pf_member_size=1
390 
391 
392  pf%particles = particles+1
393  pf%count = cnt
394  pf%nens = nens
395  end subroutine initialise_mpi_v2
396 
398  subroutine initialise_mpi_v3
399  use sizes
400  use pf_control
401  implicit none
402  include 'mpif.h'
403 
404  integer :: mpi_err
405  integer :: world_size
406  integer,parameter :: da=1
407  integer, parameter :: rk = kind(1.0d0)
408  integer :: i
409  integer :: mdl_procs
410  integer :: first_ptcl
411  integer :: final_ptcl
412  integer :: tmp_cpl_comm
413  integer :: tmp_cpl_comm2
414  integer :: tmp_cpl_rank
415  integer :: tmp_cpl_colour2
416  integer :: pf_member_colour
417  integer :: pf_ens_colour
418  integer :: status(mpi_status_size)
419 
420 
421  call mpi_init(mpi_err)
422  call mpi_comm_rank(mpi_comm_world,world_rank,mpi_err)
423  call mpi_comm_size(mpi_comm_world,world_size,mpi_err)
424  print*,'EMPIRE: rank = ',world_rank,' on mpi_comm_world which has size ',world_size
425 
426 
427  !get the number of processes per model
428  call mpi_allreduce(0,mdl_num_proc,1,mpi_integer,mpi_max&
429  &,mpi_comm_world,mpi_err)
430 
431  if(mdl_num_proc .lt. 1) then
432  write(emp_e,*) 'EMPIRE COMMS v3 ERROR: mdl_num_proc < 1'
433  write(emp_e,*) 'mdl_num_proc = ',mdl_num_proc
434  write(emp_e,*) 'THIS SUGGESTS YOU HAVE NOT LINKED TO A MODEL. STOP.'
435  stop
436  else
437  print*,'mdl_num_proc = ',mdl_num_proc
438  end if
439 !!!==================================================!!!
440 !!!================ BRUCE LEE =======================!!!
441 !!!==================================================!!!
442 
443 
444  !split into models and da processes. create pf_mpi_comm
445  call mpi_comm_split(mpi_comm_world,da,world_rank,pf_mpi_comm,mpi_err)
446  call mpi_comm_size(pf_mpi_comm,npfs,mpi_err)
447  call mpi_comm_rank(pf_mpi_comm,pfrank,mpi_err)
448 
449 !!!==================================================!!!
450 !!!================ JEAN CLAUDE VAN DAMME ===========!!!
451 !!!==================================================!!!
452 
453 
454  !split pf_mpi_comm into communicator for ensemble member
455  pf_member_colour= pfrank/mdl_num_proc
456  call mpi_comm_split(pf_mpi_comm,pf_member_colour,pfrank&
457  &,pf_member_comm,mpi_err)
458  call mpi_comm_rank(pf_member_comm,pf_member_rank,mpi_err)
459 
460  !split pf_mpi_comm into communicator for ensemble (local to this
461  !part of the state vector)
462  pf_ens_colour = mod(pfrank,mdl_num_proc)
463  call mpi_comm_split(pf_mpi_comm,pf_ens_colour,pfrank,pf_ens_comm&
464  &,mpi_err)
465  call mpi_comm_size(pf_ens_comm,pf_ens_size,mpi_err)
466  call mpi_comm_rank(pf_ens_comm,pf_ens_rank,mpi_err)
467 
468 
469 
470 
471  !compute number of model processes
472  mdl_procs = world_size-npfs
473  npfs = pf_ens_size
474 
475  print*,'npfs = ',npfs
476  print*,'mdl_procs = ',mdl_procs
477 
478 
479  !compute number of ensemble members
480  nens = mdl_procs/mdl_num_proc
481  print*,'nens = ',nens
482 
483 
484  !compute range of particles that this mpi process communicates with
485  first_ptcl = ceiling(real(pf_member_colour)*real(nens)/real(pf_ens_size))
486  final_ptcl = ceiling(real(pf_member_colour+1)*real(nens)/real(pf_ens_size))-1
487  particles = (/ (i, i = first_ptcl,final_ptcl) /)
488  print*,'range of particles = ',first_ptcl,final_ptcl
489 
490 
491  ! count the number of particles associated with this process
492  cnt = final_ptcl-first_ptcl+1
493  if(cnt .lt. 1) then
494  write(emp_e,*) 'EMPIRE ERROR: YOU HAVE LAUNCHED MORE EMPIRE DA PROCESSES'
495  write(emp_e,*) 'EMPIRE ERROR: THAN MODELS. I AM REDUDANT AND STOPPING.'
496  write(emp_e,*) 'EMPIRE ERROR: RECONSIDER HOW YOU EXECUTE NEXT TIME. xx'
497  stop
498  end if
499 
500  !!!==================================================!!!
501  !!!============== STEVEN SEGAL ======================!!!
502  !!!==================================================!!!
503 
504  !create a temporary communicator with all associated model processes
505  call mpi_comm_split(mpi_comm_world,pf_member_colour,world_size+pf_member_colour&
506  &,tmp_cpl_comm,mpi_err)
507  print*,'split and created tmp_cpl_comm'
508 
509 !!!==================================================!!!
510 !!!============== CHUCK NORRIS ======================!!!
511 !!!==================================================!!!
512 
513  !get the rank and set the colour across the model mpi processes
514  call mpi_comm_rank(tmp_cpl_comm,tmp_cpl_rank,mpi_err)
515  tmp_cpl_colour2 = mod(tmp_cpl_rank,mdl_num_proc)
516 
517  !split this temp communicator into a new temporary one
518  call mpi_comm_split(tmp_cpl_comm,tmp_cpl_colour2,tmp_cpl_rank&
519  &,tmp_cpl_comm2,mpi_err)
520 
521 !!!==================================================!!!
522 !!!=============== JACKIE CHAN ======================!!!
523 !!!==================================================!!!
524 
525  !allocate a communicator for each ensemble member
526  allocate(cpl_mpi_comms(cnt))
527 
528 
529  !split the second temporary communicator into individual ones for each
530  !ensemble member
531  do i = 1,cnt
532  call mpi_comm_split(tmp_cpl_comm2,1,world_size,cpl_mpi_comms(i)&
533  &,mpi_err)
534  write(*,'(A,i3.3,A,i0)') 'created cpl_mpi_comms(',i,') ',cpl_mpi_comms(i)
535  end do
536 
537  !the rank of this mpi process each of cpl_mpi_comms(:) is the highest
538  cpl_rank = 1
539 
540 
541 !!!==================================================!!!
542 !!!=============== MICHELLE YEOH ====================!!!
543 !!!==================================================!!!
544 
545  !free up the temporary communicators
546  call mpi_comm_free(tmp_cpl_comm ,mpi_err)
547  call mpi_comm_free(tmp_cpl_comm2,mpi_err)
548 
549  state_dim = 0
550 
551 
552  !for each ensemble member, gather the number of variables stored
553  !on each process
554  do i = 1,cnt
555  call mpi_recv(state_dim,1,mpi_integer,0,mpi_any_tag&
556  &,cpl_mpi_comms(i),status,mpi_err)
557  end do
558  print*,'state_dim = ',state_dim
559 
560 !!!==================================================!!!
561 !!!=============== SCARLETT JOHANSSON ===============!!!
562 !!!==================================================!!!
563 
564  !compute the total state dimension:
565  call mpi_allreduce(state_dim,state_dim_g,1,mpi_integer,mpi_sum&
566  &,pf_member_comm,mpi_err)
567  print*,'total state vector size = ',state_dim_g
568 
569 
570  !compute counts and displacements of particles associated with da
571  !processes
572  allocate(gblcount(pf_ens_size))
573  allocate(gbldisp(pf_ens_size))
574 
575 
576  call mpi_allgather(cnt,1,mpi_integer,gblcount,1,mpi_integer&
577  &,pf_ens_comm,mpi_err)
578  if(mpi_err .eq. 0) then
579  print*,'mpi_allgather successful: gblcount known on all da proc&
580  &esses'
581  print*,'gblcount = ',gblcount
582  else
583  print*,'mpi_allgather unsucessful'
584  end if
585 
586 
587  gbldisp = 0
588  if(npfs .gt. 1) then
589  do i = 2,pf_ens_size
590  gbldisp(i) = gbldisp(i-1) + gblcount(i-1)
591  end do
592  end if
593 
594 
595 
596 
597 
598 
599  pf%particles = particles+1
600  pf%count = cnt
601  pf%nens = nens
602  end subroutine initialise_mpi_v3
603 
604 
607  subroutine initialise_mpi_v4
608  use pf_control
609  use sizes
610  use output_empire, only : unit_nml
611  implicit none
612  include 'mpif.h'
613 
614  integer :: mpi_err
615  integer :: world_size
616  integer, parameter :: rk = kind(1.0d0)
617  integer :: i
618  integer :: first_ptcl
619  integer :: final_ptcl
620  integer :: ios
621  logical :: file_exists
622  namelist/comms_v4/nens
623 
624 
625  call mpi_init(mpi_err)
626  call mpi_comm_rank(mpi_comm_world,world_rank,mpi_err)
627  call mpi_comm_size(mpi_comm_world,world_size,mpi_err)
628  print*,'EMPIRE: rank = ',world_rank,' on mpi_comm_world which has size ',world_size
629 
630 
631  npfs = world_size
632  pfrank = world_rank
633 
634  !now need to get the total number of ensemble members that will
635  !be used. let us read it in from empire.nml!
636 
637  inquire(file='pf_parameters.dat',exist=file_exists)
638  if(file_exists) then
639  open(unit_nml,file='pf_parameters.dat',iostat=ios,action='read'&
640  &,status='old')
641  if(ios .ne. 0) then
642  write(emp_e,*) 'Cannot open pf_parameters.dat'
643  stop
644  end if
645  else
646  inquire(file='empire.nml',exist=file_exists)
647  if(file_exists) then
648  open(unit_nml,file='empire.nml',iostat=ios,action='read'&
649  &,status='old')
650  if(ios .ne. 0) then
651  write(emp_e,*) 'Cannot open empire.nml'
652  stop
653  end if
654  else
655  write(emp_e,*) 'ERROR: cannot find pf_parameters.dat or empire.nml'
656  stop '-1'
657  end if
658  end if
659  ! set nens to be negative as a default value
660  nens = -1
661  !now read it in
662  read(unit_nml,nml=comms_v4)
663  close(unit_nml)
664 
665  if( nens .lt. 1 ) then
666  write(emp_e,*) 'EMPIRE ERROR: __________initialise_mpi_v4_____________'
667  write(emp_e,*) 'EMPIRE ERROR: nens is less than 1... nens = ',nens
668  write(emp_e,*) 'EMPIRE ERROR: please correctly specify this in empire.n&
669  &ml'
670  stop '-1'
671  end if
672 
673  if (npfs .gt. nens) then
674  write(emp_e,*) 'EMPIRE ERROR: __________initialise_mpi_v4_____________'
675  write(emp_e,*) 'EMPIRE ERROR: npfs is great than nens...'
676  write(emp_e,*) 'EMPIRE ERROR: npfs = ',npfs,' nens = ',nens
677  stop '-1'
678  end if
679 
680  !compute range of particles that this mpi process communicates with
681  first_ptcl = ceiling(real(pfrank)*real(nens)/real(npfs))
682  final_ptcl = ceiling(real(pfrank+1)*real(nens)/real(npfs))-1
683  particles = (/ (i, i = first_ptcl,final_ptcl) /)
684  print*,'range of particles = ',first_ptcl,final_ptcl
685 
686  !set the da communicator:
687  pf_mpi_comm = mpi_comm_world
688  pf_ens_comm = mpi_comm_world
689  pf_ens_size=npfs
690  pf_ens_rank=pfrank
691 
692  pf_member_rank=0
693  pf_member_size=1
694 
695  ! count the number of particles associated with this process
696  cnt = final_ptcl-first_ptcl+1
697 
698  !compute counts and displacements of particles associated with da
699  !processes
700  allocate(gblcount(npfs))
701  allocate(gbldisp(npfs))
702 
703  do i = 1,npfs
704  gblcount(i) = ceiling(real(i)*real(nens)/real(npfs)) &
705  &- ceiling(real(i-1)*real(nens)/real(npfs))
706  end do
707 
708  gbldisp = 0
709  if(npfs .gt. 1) then
710  do i = 2,npfs
711  gbldisp(i) = gbldisp(i-1) + gblcount(i-1)
712  end do
713  end if
714 
715  pf%particles = particles+1
716  pf%count = cnt
717  pf%nens = nens
718 
719  end subroutine initialise_mpi_v4
720 
721 
724  subroutine initialise_mpi_v5
725  use pf_control
726  use sizes
727  implicit none
728  include 'mpif.h'
729 
730  integer :: mpi_err
731  integer :: world_size
732  integer, parameter :: da=1
733  integer, parameter :: rk = kind(1.0d0)
734  integer :: i,j
735  integer :: mdl_procs
736  integer :: first_ptcl
737  integer :: final_ptcl
738  integer :: tmp_cpl_comm
739  integer :: n_mdl_instances
740  integer :: nens_per_instance
741 
742  call mpi_init(mpi_err)
743  call mpi_comm_rank(mpi_comm_world,world_rank,mpi_err)
744  call mpi_comm_size(mpi_comm_world,world_size,mpi_err)
745  print*,'EMPIRE: rank = ',world_rank,' on mpi_comm_world which has size ',world_size
746 
747 
748  !get the number of processes per model
749  call mpi_allreduce(0,mdl_num_proc,1,mpi_integer,mpi_max&
750  &,mpi_comm_world,mpi_err)
751 
752  if(mdl_num_proc .lt. 1) then
753  write(emp_e,*) 'EMPIRE COMMS v5 ERROR: mdl_num_proc < 1'
754  write(emp_e,*) 'mdl_num_proc = ',mdl_num_proc
755  write(emp_e,*) 'THIS SUGGESTS YOU HAVE NOT LINKED TO A MODEL. STOP.'
756  stop
757  else
758  print*,'mdl_num_proc = ',mdl_num_proc
759  end if
760 
761  !get the number of ensemble members per model process
762  call mpi_allreduce(0,nens_per_instance,1,mpi_integer,mpi_max&
763  &,mpi_comm_world,mpi_err)
764  if(nens_per_instance .lt. 1) then
765  write(emp_e,*) 'EMPIRE COMMS v5 ERROR: nens_per_instance < 1'
766  write(emp_e,*) 'nens_per_instance = ',nens_per_instance
767  write(emp_e,*) 'THIS SUGGESTS YOU HAVE NOT LINKED TO A MODEL. STOP.'
768  stop
769  else
770  print*,'nens_per_instance = ',nens_per_instance
771  end if
772  call flush(6)
773 
774 
775  !split into models and da processes. create pf_mpi_comm
776  call mpi_comm_split(mpi_comm_world,da,world_rank,pf_mpi_comm,mpi_err)
777  call mpi_comm_size(pf_mpi_comm,npfs,mpi_err)
778  call mpi_comm_rank(pf_mpi_comm,pfrank,mpi_err)
779 
780  !compute number of model processes
781  mdl_procs = world_size-npfs
782  print*,'npfs = ',npfs
783 
784  print*,'mdl_procs = ',mdl_procs
785 
786 
787  !compute number of ensemble members
788  n_mdl_instances = mdl_procs/mdl_num_proc
789  print*,'n_mdl_instances = ',n_mdl_instances
790 
791  nens = n_mdl_instances*nens_per_instance
792  print*,'nens = ',nens
793 
794 
795  !compute range of particles that this mpi process communicates with
796  first_ptcl = ceiling(real(pfrank)*real(nens)/real(npfs))
797  final_ptcl = ceiling(real(pfrank+1)*real(nens)/real(npfs))-1
798  particles = (/ (i, i = first_ptcl,final_ptcl) /)
799  print*,'range of particles = ',first_ptcl,final_ptcl
800 
801 
802  ! count the number of particles associated with this process
803  cnt = final_ptcl-first_ptcl+1
804  if(cnt .lt. 1) then
805  write(emp_e,*) 'EMPIRE ERROR: YOU HAVE LAUNCHED MORE EMPIRE DA PROCESSES'
806  write(emp_e,*) 'EMPIRE ERROR: THAN MODELS. I AM REDUDANT AND STOPPING.'
807  write(emp_e,*) 'EMPIRE ERROR: RECONSIDER HOW YOU EXECUTE NEXT TIME. xx'
808  stop
809  end if
810 
811 
812  !allocate a communicator for each ensemble member
813  allocate(cpl_mpi_comms(cnt))
814 
815  print*,'you have got to this point:',mod(n_mdl_instances,npfs)
816 
817  if(mod(n_mdl_instances,npfs) .ne. 0 ) then !number of instances
818  !of the model executable doesnt divide the number of empire
819  !processes launched. this is difficult to get an efficient
820  !splitting so let's just do this sequentially for each
821  !ensemble member...
822  j = 0
823  print*,'EMPIRE V5: SEQUENTIAL SPLITTING ON MPI_COMM_WORLD...'
824  print*,'nens = ',nens
825  print*,'first_ptcl = ',first_ptcl
826  print*,'final_ptcl = ',final_ptcl
827  do i = 0,nens-1
828  if( i .ge. first_ptcl .and. i .le. final_ptcl) then
829  !we should be in this communicator
830  j = j + 1
831  call mpi_comm_split(mpi_comm_world,1,world_size,cpl_mpi_comms(j)&
832  &,mpi_err)
833  write(*,'(A,i3.3,A)') 'created cpl_mpi_comms(',j,')'
834  else
835  ! we should not be part of this communicator
836  call mpi_comm_split(mpi_comm_world,0,world_size,tmp_cpl_comm,mpi_err)
837  call mpi_comm_free(tmp_cpl_comm,mpi_err)
838  end if
839  end do
840 
841 
842  else !number of model executables does divide the number of
843  !empire processes: that way we can split easily...
844 
845  !first split based on pfrank
846  print*,'doing first split based on pfrank',pfrank
847  call mpi_comm_split(mpi_comm_world,pfrank,world_size,tmp_cpl_comm,mpi_err)
848  print*,'finished first split mpi_err = ',mpi_err
849  call flush(6)
850  do i = 1,cnt
851  print*,'i = ',i
852  call mpi_comm_split(tmp_cpl_comm,1,world_size,cpl_mpi_comms(i)&
853  &,mpi_err)
854  write(*,'(A,i3.3,A)') 'created cpl_mpi_comms(',i,')'
855  end do
856  call mpi_comm_free(tmp_cpl_comm,mpi_err)
857  end if
858 
859  print*,'EMPIRE: all commiunicators made'
860 
861  !the rank of this mpi process each of cpl_mpi_comms(:) is the highest
862  cpl_rank = mdl_num_proc
863 
864  !allocate space to get the sizes from each model process
865  allocate(state_dims(mdl_num_proc+1))
866  allocate(state_displacements(mdl_num_proc+1))
867 
868 
869  !set the sizes to zero, specifically the final entry in the arrays
870  state_dims = 0
871  state_dim = 0
872 
873 
874  print*,'doing a gather on cpl_mpi_comm'
875  print*,'cnt = ',cnt
876  !for each ensemble member, gather the number of variables stored
877  !on each process
878  do i = 1,cnt
879  print*,'cpl_mpi_comms(i) = ',cpl_mpi_comms(i),cpl_rank
880  call mpi_gather(state_dim,1,mpi_integer,state_dims&
881  &,1,mpi_integer,cpl_rank,cpl_mpi_comms(i),mpi_err)
882  end do
883  print*,'state_dims = ',state_dims
884 
885 
886  !compute the relevant displacements for the gathers
887  state_displacements = 0
888  do i = 2,mdl_num_proc
889  state_displacements(i:) = state_displacements(i:) + state_dims(i-1)
890  end do
891  print*,'state_displacements = ',state_displacements
892 
893 
894  !compute the total size of the state
895  state_dim = sum(state_dims)
896  print*,'total state_dim = ',state_dim
897 
898 
899  !compute counts and displacements of particles associated with da
900  !processes
901  allocate(gblcount(npfs))
902  allocate(gbldisp(npfs))
903 
904 
905  call mpi_allgather(cnt,1,mpi_integer,gblcount,1,mpi_integer&
906  &,pf_mpi_comm,mpi_err)
907  if(mpi_err .eq. 0) then
908  print*,'mpi_allgather successful: gblcount known on all da proc&
909  &esses'
910  print*,'gblcount = ',gblcount
911  else
912  print*,'mpi_allgather unsucessful'
913  end if
914 
915 
916  gbldisp = 0
917  if(npfs .gt. 1) then
918  do i = 2,npfs
919  gbldisp(i) = gbldisp(i-1) + gblcount(i-1)
920  end do
921  end if
922 
923  pf_ens_comm = pf_mpi_comm
924  pf_ens_size=npfs
925  pf_ens_rank=pfrank
926 
927  pf_member_rank=0
928  pf_member_size=1
929 
930  pf%particles = particles+1
931  pf%count = cnt
932  pf%nens = nens
933  end subroutine initialise_mpi_v5
934 
935 
936 
937 
938 
939 
940 
941 
942 
943 
945  subroutine send_all_models(stateDim,nrhs,x,tag)
946  implicit none
947  include 'mpif.h'
948  integer, intent(in) :: stateDim
949  integer, intent(in) :: nrhs
950  real(kind=kind(1.0d0)), intent(in), dimension(stateDim,nrhs) :: x
951  integer, intent(in) :: tag
952  integer :: k
953  integer :: mpi_err
954  integer :: particle
955  real(kind=kind(1.0d0)), dimension(0) :: send_null
956  select case(comm_version)
957  case(0)
958  call user_mpi_send(statedim,nrhs,x,tag)
959  case(1)
960  do k =1,cnt
961  particle = particles(k)
962  call mpi_send(x(:,k),statedim,mpi_double_precision&
963  &,particle-1,tag,cpl_mpi_comm,mpi_err)
964  end do
965  case(2,5)
966  do k = 1,cnt
967  call mpi_scatterv(x(:,k),state_dims,state_displacements&
968  &,mpi_double_precision,send_null,0,mpi_double_precision&
969  &,cpl_rank,cpl_mpi_comms(k),mpi_err)
970  call mpi_bcast(tag,1,mpi_integer,cpl_rank,cpl_mpi_comms(k)&
971  &,mpi_err)
972  end do
973  case(3)
974  do k = 1,cnt
975  call mpi_send(x(:,k),statedim,mpi_double_precision&
976  &,0,tag,cpl_mpi_comms(k),mpi_err)
977  end do
978  case(4)
979  do k = 1,cnt
980  particle = particles(k)
981  call model_as_subroutine_start(x(:,k),particle,tag)
982  end do
983  case default
984  write(emp_e,*) 'EMPIRE ERROR: THIS ISNT BACK TO THE FUTURE. empire_v&
985  &ersion not yet implemented'
986  stop
987  end select
988  end subroutine send_all_models
989 
990 
992  !it has updated them one timestep
993  subroutine recv_all_models(stateDim,nrhs,x)
994  use timestep_data
995  implicit none
996  include 'mpif.h'
997  integer, intent(in) :: stateDim
998  integer, intent(in) :: nrhs
999  real(kind=kind(1.0d0)), intent(out), dimension(stateDim,nrhs) :: x
1000  integer :: k
1001  integer, dimension(MPI_STATUS_SIZE) :: mpi_status
1002  integer :: mpi_err
1003  integer :: particle
1004  real(kind=kind(1.0d0)), dimension(0) :: send_null
1005  select case(comm_version)
1006  case(0)
1007  call user_mpi_recv(statedim,nrhs,x)
1008  case(1)
1009  DO k = 1,cnt
1010  particle = particles(k)
1011  CALL mpi_recv(x(:,k), statedim, mpi_double_precision, &
1012  particle-1, mpi_any_tag, cpl_mpi_comm,mpi_status, mpi_err)
1013  END DO
1014  case(2,5)
1015  do k = 1,cnt
1016  call mpi_gatherv(send_null,0,mpi_double_precision,x(:,k)&
1017  &,state_dims,state_displacements,mpi_double_precision,cpl_rank&
1018  &,cpl_mpi_comms(k),mpi_err)
1019  end do
1020  case(3)
1021  DO k = 1,cnt
1022  CALL mpi_recv(x(:,k), statedim, mpi_double_precision, &
1023  0, mpi_any_tag, cpl_mpi_comms(k),mpi_status, mpi_err)
1024  END DO
1025  case(4)
1026  do k = 1,cnt
1027  particle = particles(k)
1028  call model_as_subroutine_return(x(:,k),particle)
1029  end do
1030  case default
1031  write(emp_e,*) 'EMPIRE ERROR: THIS ISNT BACK TO THE FUTURE PART 2. empire_v&
1032  &ersion not yet implemented'
1033  stop
1034  end select
1035 
1036 
1037  !at this point the ensemble has been updated by the model so is
1038  !no longer an analysis
1040  end subroutine recv_all_models
1041 
1042 
1044  !it has updated them one timestep in a non-blocking manner
1045  subroutine irecv_all_models(stateDim,nrhs,x,requests)
1046  implicit none
1047  include 'mpif.h'
1048  integer, intent(in) :: stateDim
1049  integer, intent(in) :: nrhs
1050  real(kind=kind(1.0d0)), intent(out), dimension(stateDim,nrhs) :: x
1051  integer, dimension(nrhs), intent(inout) :: requests
1052  integer :: k
1053  integer :: mpi_err
1054  integer :: particle
1055  real(kind=kind(1.0d0)), dimension(0) :: send_null
1056  integer, dimension(MPI_STATUS_SIZE) :: mpi_status
1057  select case(comm_version)
1058  case(0)
1059  call user_mpi_irecv(statedim,nrhs,x,requests)
1060  case(1)
1061  DO k = 1,cnt
1062  particle = particles(k)
1063  CALL mpi_irecv(x(:,k), statedim, mpi_double_precision, &
1064  particle-1, mpi_any_tag, cpl_mpi_comm,requests(k), mpi_err)
1065  end DO
1066  case(2,5)
1067  do k = 1,cnt
1068  !I DONT THINK MY VERSION OF MPI HAS THE MPI_IGATHERV
1069  !call mpi_igatherv(send_null,0,MPI_DOUBLE_PRECISION,x(:,k)&
1070  ! &,state_dims,state_displacements,MPI_DOUBLE_PRECISION,cpl_rank&
1071  ! &,cpl_mpi_comms(k),requests(k),mpi_err)
1072 
1073  !replace with standard blocking gather
1074  call mpi_gatherv(send_null,0,mpi_double_precision,x(:,k)&
1075  &,state_dims,state_displacements,mpi_double_precision,cpl_rank&
1076  &,cpl_mpi_comms(k),mpi_err)
1077 
1078  !get the requests to be null
1079  call mpi_isend(send_null,0,mpi_double_precision,cpl_rank&
1080  &,1,cpl_mpi_comms(k),requests(k),mpi_err)
1081  CALL mpi_recv(send_null,0,mpi_double_precision,cpl_rank&
1082  &,1,cpl_mpi_comms(k),mpi_status, mpi_err)
1083 
1084  end do
1085  case(3)
1086  DO k = 1,cnt
1087  CALL mpi_irecv(x(:,k), statedim, mpi_double_precision, &
1088  0, mpi_any_tag, cpl_mpi_comms(k),requests(k), mpi_err)
1089  end DO
1090  case(4)
1091  do k = 1,cnt
1092  particle = particles(k)
1093  call model_as_subroutine_return(x(:,k),particle)
1094  ! the following may be totally wrong and need to change
1095  ! to something like the v2 case! see if it breaks if
1096  ! this is ever called.
1097  requests(k) = mpi_request_null
1098  end do
1099  case default
1100  write(emp_e,*) 'EMPIRE ERROR: THIS ISNT BACK TO THE FUTURE PART 3. empire_v&
1101  &ersion not yet implemented'
1102  stop
1103  end select
1104 
1105  end subroutine irecv_all_models
1106 
1107 
1108  subroutine verify_sizes
1109  use sizes
1110  implicit none
1111  include 'mpif.h'
1112  integer :: mpi_err,i
1113 
1114  select case(comm_version)
1115  case(1,2,4,5)
1116 
1117  state_dim_g = state_dim
1118  obs_dim_g = obs_dim
1119  pf_ens_rank = pfrank
1120 
1121  case(3)
1122  if(allocated(obs_dims)) deallocate(obs_dims)
1123  allocate(obs_dims(pf_member_size))
1124  if(allocated(obs_displacements)) deallocate(obs_displacements)
1125  allocate(obs_displacements(pf_member_size))
1126 
1127  call mpi_allgather(obs_dim,1,mpi_integer,obs_dims,1&
1128  &,mpi_integer,pf_member_comm,mpi_err)
1129 
1130  obs_displacements(1) = 0
1131  if(pf_member_size .gt. 1) then
1132  do i = 2,pf_member_size
1133  obs_displacements(i) = obs_displacements(i-1) + obs_dims(i&
1134  &-1)
1135  end do
1136  end if
1137 
1138 
1139  if(.not. allocated(state_dims)) then
1140  allocate(state_dims(pf_member_size))
1141  allocate(state_displacements(pf_member_size))
1142 
1143  call mpi_allgather(state_dim,1,mpi_integer,state_dims,1&
1144  &,mpi_integer,pf_member_comm,mpi_err)
1145 
1146  state_displacements(1) = 0
1147  if(pf_member_size .gt. 1) then
1148  do i = 2,pf_member_size
1149  state_displacements(i) = state_displacements(i-1) +&
1150  & state_dims(i-1)
1151  end do
1152  end if
1153  end if
1154  case default
1155  print*,'EMPIRE ERROR: COMM VERSION IN VERIFY SIZES NOT IMPLEMENTED'
1156  end select
1157 
1158  end subroutine verify_sizes
1159 
1160 end module comms
subroutine user_mpi_recv(stateDim, nrhs, x)
subroutine user_mpi_send(stateDim, nrhs, x, tag)
subroutine send_all_models(stateDim, nrhs, x, tag)
subroutine to send all the model states to the models
Definition: comms.f90:945
Module containing EMPIRE coupling data.
Definition: comms.f90:57
subroutine allocate_data
Definition: comms.f90:107
Module that stores the information about the outputting from empire.
subroutine deallocate_data
Definition: comms.f90:113
Module that stores the information about the timestepping process.
subroutine recv_all_models(stateDim, nrhs, x)
subroutine to receive all the model states from the models after
Definition: comms.f90:993
subroutine initialise_mpi_v4
subroutine to initialise empire communicators when the model is to be a subroutine itself ...
Definition: comms.f90:607
subroutine initialise_mpi(mdl_id, cpl_root, cpl_mpi_comm)
subroutine initialise_mpi_v1
subroutine to make EMPIRE connections and saves details into pf_control module
Definition: comms.f90:146
subroutine user_initialise_mpi
Subroutine to initialise mpi in a special way if the model is weird like HadCM3 for example...
Module that stores the dimension of observation and state spaces.
Definition: sizes.f90:29
subroutine initialise_mpi_v2(mdl_rank, cpl_root, cpl_mpi_comm)
subroutine initialise_mpi_v3
subroutine to initialise even newer version of empire
Definition: comms.f90:398
module to store the parameter comm_version to control the communication pattern that empire will use...
subroutine initialise_mpi_v5
subroutine to initialise empire communication pattern similarly to v2 but with multiple ensemble memb...
Definition: comms.f90:724
subroutine model_as_subroutine_start(x, particle, tag)
subroutine to increment the model when the model is a subroutine of empire. This is comms_v4 routine...
module pf_control holds all the information to control the the main program
Definition: pf_control.f90:29
subroutine verify_sizes
Definition: comms.f90:1108
subroutine model_as_subroutine_return(x, particle)
subroutine to initialise and return the state from the model
subroutine irecv_all_models(stateDim, nrhs, x, requests)
subroutine to receive all the model states from the models after
Definition: comms.f90:1045
subroutine user_mpi_irecv(stateDim, nrhs, x, requests)
subroutine timestep_data_set_no_analysis
subroutine to define if the current ensemble is not an analysis