@@ -291,57 +291,7 @@ public Single<ValueT> execute(
291291 Action onAlreadyRunning ,
292292 Action onAlreadyFinished ,
293293 boolean force ) {
294- return Single .create (
295- emitter -> {
296- synchronized (lock ) {
297- if (state != STATE_ACTIVE ) {
298- emitter .onError (new CancellationException ("already shutdown" ));
299- return ;
300- }
301-
302- if (!force && finished .containsKey (key )) {
303- onAlreadyFinished .run ();
304- emitter .onSuccess (finished .get (key ));
305- return ;
306- }
307-
308- finished .remove (key );
309-
310- Execution execution = inProgress .get (key );
311- if (execution != null ) {
312- onAlreadyRunning .run ();
313- } else {
314- execution = new Execution (key , task );
315- inProgress .put (key , execution );
316- }
317-
318- // We must subscribe the execution within the scope of lock to avoid race condition
319- // that:
320- // 1. Two callers get the same execution instance
321- // 2. One decides to dispose the execution, since no more observers, the execution
322- // will change to the terminate state
323- // 3. Another one try to subscribe, will get "terminated" error.
324- execution .subscribe (
325- new SingleObserver <ValueT >() {
326- @ Override
327- public void onSubscribe (@ NonNull Disposable d ) {
328- emitter .setDisposable (d );
329- }
330-
331- @ Override
332- public void onSuccess (@ NonNull ValueT valueT ) {
333- emitter .onSuccess (valueT );
334- }
335-
336- @ Override
337- public void onError (@ NonNull Throwable e ) {
338- if (!emitter .isDisposed ()) {
339- emitter .onError (e );
340- }
341- }
342- });
343- }
344- });
294+ return task
345295 }
346296
347297 /**
0 commit comments