17
17
use Symfony \Bridge \PhpUnit \ExpectDeprecationTrait ;
18
18
use Symfony \Bundle \FrameworkBundle \DependencyInjection \Compiler \AddAnnotationsCachedReaderPass ;
19
19
use Symfony \Bundle \FrameworkBundle \DependencyInjection \FrameworkExtension ;
20
+ use Symfony \Bundle \FrameworkBundle \Tests \Fixtures \Messenger \BarMessage ;
21
+ use Symfony \Bundle \FrameworkBundle \Tests \Fixtures \Messenger \DefaultSchedule ;
20
22
use Symfony \Bundle \FrameworkBundle \Tests \Fixtures \Messenger \DummyMessage ;
23
+ use Symfony \Bundle \FrameworkBundle \Tests \Fixtures \Messenger \FooMessage ;
21
24
use Symfony \Bundle \FrameworkBundle \Tests \TestCase ;
22
25
use Symfony \Bundle \FullStack ;
23
26
use Symfony \Component \Cache \Adapter \AdapterInterface ;
30
33
use Symfony \Component \Cache \Adapter \RedisTagAwareAdapter ;
31
34
use Symfony \Component \Cache \Adapter \TagAwareAdapter ;
32
35
use Symfony \Component \Cache \DependencyInjection \CachePoolPass ;
36
+ use Symfony \Component \Clock \MockClock ;
33
37
use Symfony \Component \Config \Definition \Exception \InvalidConfigurationException ;
34
38
use Symfony \Component \DependencyInjection \Argument \ServiceClosureArgument ;
35
39
use Symfony \Component \DependencyInjection \Argument \ServiceLocatorArgument ;
36
40
use Symfony \Component \DependencyInjection \Argument \TaggedIteratorArgument ;
37
41
use Symfony \Component \DependencyInjection \ChildDefinition ;
42
+ use Symfony \Component \DependencyInjection \Compiler \AttributeAutoconfigurationPass ;
38
43
use Symfony \Component \DependencyInjection \Compiler \CompilerPassInterface ;
44
+ use Symfony \Component \DependencyInjection \Compiler \PassConfig ;
45
+ use Symfony \Component \DependencyInjection \Compiler \ResolveChildDefinitionsPass ;
39
46
use Symfony \Component \DependencyInjection \Compiler \ResolveInstanceofConditionalsPass ;
47
+ use Symfony \Component \DependencyInjection \Compiler \ResolveTaggedIteratorArgumentPass ;
48
+ use Symfony \Component \DependencyInjection \Compiler \ServiceLocatorTagPass ;
40
49
use Symfony \Component \DependencyInjection \ContainerBuilder ;
41
50
use Symfony \Component \DependencyInjection \ContainerInterface ;
42
51
use Symfony \Component \DependencyInjection \Definition ;
55
64
use Symfony \Component \HttpClient \ScopingHttpClient ;
56
65
use Symfony \Component \HttpKernel \DependencyInjection \LoggerPass ;
57
66
use Symfony \Component \HttpKernel \Fragment \FragmentUriGeneratorInterface ;
67
+ use Symfony \Component \Messenger \Bridge \Schedule \Transport \PeriodicalJob ;
68
+ use Symfony \Component \Messenger \Bridge \Schedule \Transport \Schedule ;
69
+ use Symfony \Component \Messenger \Bridge \Schedule \Transport \ScheduleTransport ;
70
+ use Symfony \Component \Messenger \DependencyInjection \MessengerPass ;
58
71
use Symfony \Component \Messenger \Transport \TransportFactory ;
59
72
use Symfony \Component \Notifier \ChatterInterface ;
60
73
use Symfony \Component \Notifier \TexterInterface ;
@@ -740,6 +753,13 @@ public function testWebLink()
740
753
public function testMessengerServicesRemovedWhenDisabled ()
741
754
{
742
755
$ container = $ this ->createContainerFromFile ('messenger_disabled ' );
756
+ $ messengerDefinitions = array_filter (
757
+ $ container ->getDefinitions (),
758
+ static fn ($ name ) => str_starts_with ($ name , 'messenger. ' ),
759
+ \ARRAY_FILTER_USE_KEY
760
+ );
761
+
762
+ $ this ->assertEmpty ($ messengerDefinitions );
743
763
$ this ->assertFalse ($ container ->hasDefinition ('console.command.messenger_consume_messages ' ));
744
764
$ this ->assertFalse ($ container ->hasDefinition ('console.command.messenger_debug ' ));
745
765
$ this ->assertFalse ($ container ->hasDefinition ('console.command.messenger_stop_workers ' ));
@@ -772,14 +792,28 @@ public function testMessengerWithExplictResetOnMessageLegacy()
772
792
773
793
public function testMessenger ()
774
794
{
775
- $ container = $ this ->createContainerFromFile ('messenger ' );
795
+ $ container = $ this ->createContainerFromFile ('messenger ' , [], true , false );
796
+ $ container ->addCompilerPass (new ResolveTaggedIteratorArgumentPass ());
797
+ $ container ->compile ();
798
+
799
+ $ expectedFactories = [
800
+ new Reference ('messenger.transport.amqp.factory ' ),
801
+ new Reference ('messenger.transport.redis.factory ' ),
802
+ new Reference ('messenger.transport.sync.factory ' ),
803
+ new Reference ('messenger.transport.in_memory.factory ' ),
804
+ new Reference ('messenger.transport.sqs.factory ' ),
805
+ new Reference ('messenger.transport.beanstalkd.factory ' ),
806
+ new Reference ('messenger.transport.schedule.factory ' ),
807
+ ];
808
+
809
+ $ this ->assertTrue ($ container ->hasDefinition ('messenger.receiver_locator ' ));
776
810
$ this ->assertTrue ($ container ->hasDefinition ('console.command.messenger_consume_messages ' ));
777
811
$ this ->assertTrue ($ container ->hasAlias ('messenger.default_bus ' ));
778
812
$ this ->assertTrue ($ container ->getAlias ('messenger.default_bus ' )->isPublic ());
779
- $ this ->assertTrue ($ container ->hasDefinition ('messenger.transport.amqp.factory ' ));
780
- $ this ->assertTrue ($ container ->hasDefinition ('messenger.transport.redis.factory ' ));
781
813
$ this ->assertTrue ($ container ->hasDefinition ('messenger.transport_factory ' ));
782
814
$ this ->assertSame (TransportFactory::class, $ container ->getDefinition ('messenger.transport_factory ' )->getClass ());
815
+ $ this ->assertInstanceOf (TaggedIteratorArgument::class, $ container ->getDefinition ('messenger.transport_factory ' )->getArgument (0 ));
816
+ $ this ->assertEquals ($ expectedFactories , $ container ->getDefinition ('messenger.transport_factory ' )->getArgument (0 )->getValues ());
783
817
$ this ->assertTrue ($ container ->hasDefinition ('messenger.listener.reset_services ' ));
784
818
$ this ->assertSame ('messenger.listener.reset_services ' , (string ) $ container ->getDefinition ('console.command.messenger_consume_messages ' )->getArgument (5 ));
785
819
}
@@ -796,10 +830,7 @@ public function testMessengerWithoutConsole()
796
830
$ this ->assertFalse ($ container ->hasDefinition ('console.command.messenger_consume_messages ' ));
797
831
$ this ->assertTrue ($ container ->hasAlias ('messenger.default_bus ' ));
798
832
$ this ->assertTrue ($ container ->getAlias ('messenger.default_bus ' )->isPublic ());
799
- $ this ->assertTrue ($ container ->hasDefinition ('messenger.transport.amqp.factory ' ));
800
- $ this ->assertTrue ($ container ->hasDefinition ('messenger.transport.redis.factory ' ));
801
833
$ this ->assertTrue ($ container ->hasDefinition ('messenger.transport_factory ' ));
802
- $ this ->assertSame (TransportFactory::class, $ container ->getDefinition ('messenger.transport_factory ' )->getClass ());
803
834
$ this ->assertFalse ($ container ->hasDefinition ('messenger.listener.reset_services ' ));
804
835
}
805
836
@@ -924,6 +955,16 @@ public function testMessengerTransports()
924
955
925
956
$ this ->assertTrue ($ container ->hasDefinition ('messenger.transport.beanstalkd.factory ' ));
926
957
958
+ $ this ->assertTrue ($ container ->hasDefinition ('messenger.transport.schedule ' ));
959
+ $ transportFactory = $ container ->getDefinition ('messenger.transport.schedule ' )->getFactory ();
960
+ $ transportArguments = $ container ->getDefinition ('messenger.transport.schedule ' )->getArguments ();
961
+
962
+ $ this ->assertEquals ([new Reference ('messenger.transport_factory ' ), 'createTransport ' ], $ transportFactory );
963
+ $ this ->assertCount (3 , $ transportArguments );
964
+ $ this ->assertSame ('schedule://default ' , $ transportArguments [0 ]);
965
+
966
+ $ this ->assertTrue ($ container ->hasDefinition ('messenger.transport.schedule.factory ' ));
967
+
927
968
$ this ->assertSame (10 , $ container ->getDefinition ('messenger.retry.multiplier_retry_strategy.customised ' )->getArgument (0 ));
928
969
$ this ->assertSame (7 , $ container ->getDefinition ('messenger.retry.multiplier_retry_strategy.customised ' )->getArgument (1 ));
929
970
$ this ->assertSame (3 , $ container ->getDefinition ('messenger.retry.multiplier_retry_strategy.customised ' )->getArgument (2 ));
@@ -937,6 +978,7 @@ public function testMessengerTransports()
937
978
'default ' => new Reference ('messenger.transport.failed ' ),
938
979
'failed ' => new Reference ('messenger.transport.failed ' ),
939
980
'redis ' => new Reference ('messenger.transport.failed ' ),
981
+ 'schedule ' => new Reference ('messenger.transport.failed ' ),
940
982
];
941
983
942
984
$ failureTransportsReferences = array_map (function (ServiceClosureArgument $ serviceClosureArgument ) {
@@ -980,6 +1022,50 @@ public function testMessengerTransportConfiguration()
980
1022
$ this ->assertSame (['enable_max_depth ' => true ], $ serializerTransportDefinition ->getArgument (2 ));
981
1023
}
982
1024
1025
+ // fixme where to place it?
1026
+ public function testMessengerScheduler ()
1027
+ {
1028
+ DefaultSchedule::$ schedule = new Schedule (
1029
+ $ clock = new MockClock ('2020-01-01T00:09:59Z ' ),
1030
+ PeriodicalJob::create ($ foo = new FooMessage (), 600 , '2020-01-01T00:00:00Z ' ),
1031
+ PeriodicalJob::create ($ bar = new BarMessage (), 600 , '2020-01-01T00:01:00Z ' ),
1032
+ );
1033
+
1034
+ $ container = $ this ->createContainerFromFile ('messenger_schedule ' , [], true , false );
1035
+ $ container ->addCompilerPass (new AttributeAutoconfigurationPass ());
1036
+ $ container ->addCompilerPass (new ResolveInstanceofConditionalsPass ());
1037
+ $ container ->addCompilerPass (new ResolveChildDefinitionsPass (), PassConfig::TYPE_OPTIMIZE );
1038
+ $ container ->addCompilerPass (new ResolveTaggedIteratorArgumentPass (), PassConfig::TYPE_OPTIMIZE );
1039
+ $ container ->addCompilerPass (new ServiceLocatorTagPass (), PassConfig::TYPE_OPTIMIZE );
1040
+ $ container ->addCompilerPass (new MessengerPass ());
1041
+ $ container ->register (DefaultSchedule::class, DefaultSchedule::class)
1042
+ ->setAutoconfigured (true );
1043
+ $ container ->setAlias ('receivers ' , 'messenger.receiver_locator ' )
1044
+ ->setPublic (true );
1045
+ $ container ->compile ();
1046
+
1047
+ $ this ->assertTrue ($ container ->get ('receivers ' )->has ('schedule ' ));
1048
+ $ this ->assertInstanceOf (ScheduleTransport::class, $ cron = $ container ->get ('receivers ' )->get ('schedule ' ));
1049
+
1050
+ $ fetchMessages = static function (float $ sleep ) use ($ clock , $ cron ) {
1051
+ if (0 < $ sleep ) {
1052
+ $ clock ->sleep ($ sleep );
1053
+ }
1054
+ $ messages = [];
1055
+ foreach ($ cron ->get () as $ key => $ envelope ) {
1056
+ $ messages [$ key ] = $ envelope ->getMessage ();
1057
+ }
1058
+
1059
+ return $ messages ;
1060
+ };
1061
+
1062
+ $ this ->assertSame ([], $ fetchMessages (0.0 ));
1063
+ $ this ->assertSame ([$ foo ], $ fetchMessages (1.0 ));
1064
+ $ this ->assertSame ([], $ fetchMessages (1.0 ));
1065
+ $ this ->assertSame ([$ bar ], $ fetchMessages (60.0 ));
1066
+ $ this ->assertSame ([$ foo , $ bar ], $ fetchMessages (600.0 ));
1067
+ }
1068
+
983
1069
public function testMessengerWithMultipleBuses ()
984
1070
{
985
1071
$ container = $ this ->createContainerFromFile ('messenger_multiple_buses ' );
0 commit comments