دليل المبتدئين إلى RxJS و Redux Observable

Redux-Observable عبارة عن برنامج وسيط يستند إلى RxJS لـ Redux يسمح للمطورين بالعمل مع إجراءات غير متزامنة. إنه بديل عن ملحمة redux-thunk و redux-saga.

تتناول هذه المقالة أساسيات RxJS ، وكيفية إعداد Redux-Observables ، وبعض حالات الاستخدام العملية. لكن قبل ذلك ، نحتاج إلى فهم نمط المراقب .

نمط المراقب

في نمط المراقب ، يحتفظ الكائن المسمى "يمكن ملاحظته" أو "الموضوع" بمجموعة من المشتركين تسمى "المراقبون". عندما تتغير حالة الأشخاص ، فإنها تبلغ جميع مراقبيها.

في JavaScript ، سيكون أبسط مثال على ذلك هو بواعث الأحداث ومعالجات الأحداث.

عندما تفعل ذلك .addEventListener، فإنك تدفع بمراقب إلى مجموعة مراقبي الموضوع. كلما وقع الحدث ، فإن الموضوع يخطر جميع المراقبين.

RxJS

حسب الموقع الرسمي ،

RxJS هو تطبيق JavaScript لـ ReactiveX ، وهي مكتبة لإنشاء برامج غير متزامنة وقائمة على الأحداث باستخدام تسلسلات يمكن ملاحظتها.

بعبارات بسيطة ، RxJS هو تنفيذ لنمط Observer. كما أنه يوسع نمط المراقب من خلال توفير عوامل تشغيل تسمح لنا بتكوين الملاحظات والموضوعات بطريقة توضيحية.

المراقبون والمراقبون والمشغلون والموضوعات هم اللبنات الأساسية لـ RxJS. لذلك دعونا ننظر إلى كل واحد بمزيد من التفصيل الآن.

المراقبون

المراقبون هم كائنات يمكن أن تشترك في المرصدات والموضوعات. بعد الاشتراك ، يمكنهم تلقي إشعارات من ثلاثة أنواع - التالي ، والخطأ ، والكامل.

يمكن استخدام أي كائن له الهيكل التالي كمراقب.

interface Observer { closed?: boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void; }

عندما الدفعات ملاحظتها المقبل، خطأ، والإخطارات كاملة، والمراقب ل .next، .errorو .completeيتم استدعاء الأساليب.

المراقبات

الأشياء المرصودة هي كائنات يمكنها إرسال بيانات خلال فترة زمنية. يمكن تمثيلها باستخدام "مخطط الرخام".

عندما يمثل الخط الأفقي الوقت ، تمثل العقد الدائرية البيانات الصادرة عن المرصد ، ويشير الخط العمودي إلى أن المرصد قد اكتمل بنجاح.

الملاحظات قد تواجه خطأ. يمثل التقاطع الخطأ المنبعث من المرصد.

تعتبر حالات "مكتمل" و "خطأ" نهائية. هذا يعني أنه لا يمكن لـ Observables إرسال أي بيانات بعد إكمالها بنجاح أو مواجهة خطأ.

إنشاء جديرة بالملاحظة

يتم إنشاء الملاحظات باستخدام الباني new Observableالذي يأخذ وسيطة واحدة - وظيفة الاشتراك. يمكن أيضًا إنشاء المراقبات باستخدام بعض المشغلين ، لكننا سنتحدث عن ذلك لاحقًا عندما نتحدث عن العملاء.

import { Observable } from 'rxjs'; const observable = new Observable(subscriber => { // Subscribe function });

الاشتراك في مشاهدة

يمكن الاشتراك .subscribeفي المراقبات باستخدام طريقتها وتمرير مراقب.

observable.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log('completed'); });

تنفيذ ملحوظة

يتم تنفيذ وظيفة الاشتراك التي مررناها إلى new Observableالمنشئ في كل مرة يتم فيها الاشتراك في Observable.

تأخذ وظيفة الاشتراك وسيطة واحدة - المشترك. المشترك يشبه هيكل مراقب، ولها نفس 3 طرق: .next، .errorو .complete.

يمكن للمراقبات دفع البيانات إلى المراقب باستخدام .nextالطريقة. إذا اكتملت الملاحظة بنجاح ، فيمكنها إخطار المراقب باستخدام .completeالطريقة. إذا واجهت Observable خطأً ، فيمكنها دفع الخطأ إلى Observer باستخدام .errorالطريقة.

// Create an Observable const observable = new Observable(subscriber => { subscriber.next('first data'); subscriber.next('second data'); setTimeout(() => { subscriber.next('after 1 second - last data'); subscriber.complete(); subscriber.next('data after completion'); //  console.log(x), error: (x) => console.log(x), complete: () => console.log('completed') }); // Outputs: // // first data // second data // third data // after 1 second - last data // completed

الملاحظات هي أحادية الإرسال

المرصدات هي أحادية الإرسال ، مما يعني أن المراقبين يمكن أن يكون لهم مشترك واحد على الأكثر. عندما يشترك مراقب في Observable ، فإنه يحصل على نسخة من Observable لها مسار تنفيذ خاص بها ، مما يجعل Observables أحادية البث.

إنه مثل مشاهدة مقطع فيديو على YouTube. يشاهد جميع المشاهدين محتوى الفيديو نفسه ، لكن يمكنهم مشاهدة مقاطع مختلفة من الفيديو.

مثال : دعنا ننشئ ملاحظة تنبعث من 1 إلى 10 خلال 10 ثوانٍ. ثم اشترك في Observable مرة واحدة على الفور ، ومرة ​​أخرى بعد 5 ثوانٍ.

// Create an Observable that emits data every second for 10 seconds const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); }); // Subscribe to the Observable observable.subscribe({ next: value => { console.log(`Observer 1: ${value}`); } }); // After 5 seconds subscribe again setTimeout(() => { observable.subscribe({ next: value => { console.log(`Observer 2: ${value}`); } }); }, 5000); /* Output Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 1 Observer 1: 6 Observer 2: 2 Observer 1: 7 Observer 2: 3 Observer 1: 8 Observer 2: 4 Observer 1: 9 Observer 2: 5 Observer 1: 10 Observer 2: 6 Observer 2: 7 Observer 2: 8 Observer 2: 9 Observer 2: 10 */

في الإخراج ، يمكنك ملاحظة أن المراقب الثاني بدأ الطباعة من 1 على الرغم من اشتراكه بعد 5 ثوانٍ. يحدث هذا لأن المراقب الثاني تلقى نسخة من المرصد الذي تم استدعاء وظيفة الاشتراك الخاصة به مرة أخرى. يوضح هذا السلوك الأحادي للبث.

المواضيع

الموضوع هو نوع خاص من الملاحظة.

خلق موضوع

يتم إنشاء موضوع باستخدام new Subjectالمنشئ.

import { Subject } from 'rxjs'; // Create a subject const subject = new Subject();

الاشتراك في موضوع

يشبه الاشتراك في موضوع الاشتراك في "جدير بالملاحظة": يمكنك استخدام .subscribeالطريقة وتمرير "مراقب".

subject.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log("done") });

تنفيذ الموضوع

على عكس المتغيرات الظاهرة، ويدعو موضوع الخاصة بها .next، .errorو .completeطرق لدفع البيانات إلى المراقبين.

// Push data to all observers subject.next('first data'); // Push error to all observers subject.error('oops something went wrong'); // Complete subject.complete('done');

الموضوعات متعددة

الموضوعات متعددة البث: يشترك العديد من المراقبين في نفس الموضوع ومسار تنفيذه. هذا يعني أن جميع الإخطارات يتم بثها إلى جميع المراقبين. إنه مثل مشاهدة برنامج مباشر. يشاهد جميع المشاهدين نفس المقطع من نفس المحتوى في نفس الوقت.

مثال: دعنا ننشئ موضوعًا يصدر من 1 إلى 10 خلال 10 ثوانٍ. ثم اشترك في Observable مرة واحدة على الفور ، ومرة ​​أخرى بعد 5 ثوانٍ.

// Create a subject const subject = new Subject(); let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000); // Subscribe to the subjects subject.subscribe(data => { console.log(`Observer 1: ${data}`); }); // After 5 seconds subscribe again setTimeout(() => { subject.subscribe(data => { console.log(`Observer 2: ${data}`); }); }, 5000); /* OUTPUT Observer 1: 1 Observer 1: 2 Observer 1: 3 Observer 1: 4 Observer 1: 5 Observer 2: 5 Observer 1: 6 Observer 2: 6 Observer 1: 7 Observer 2: 7 Observer 1: 8 Observer 2: 8 Observer 1: 9 Observer 2: 9 Observer 1: 10 Observer 2: 10 */ 

في الإخراج ، يمكنك ملاحظة أن المراقب الثاني بدأ الطباعة من 5 بدلاً من البدء من 1. يحدث هذا لأن المراقب الثاني يشارك نفس الموضوع. نظرًا لأنه تم الاشتراك بعد 5 ثوانٍ ، فقد انتهى الموضوع بالفعل من إرسال 1 إلى 4. وهذا يوضح سلوك الإرسال المتعدد للموضوع.

الموضوعات يمكن ملاحظتها والمراقبة

موضوعات لها .next، .errorو .completeالأساليب. هذا يعني أنهم يتبعون هيكل المراقبين. ومن ثم ، يمكن أيضًا استخدام موضوع ما كمراقب وتمريره إلى .subscribeوظيفة المرصودات أو الموضوعات الأخرى.

مثال: دعونا ننشئ موضوع قابل للملاحظة وموضوع. ثم اشترك في المرصد باستخدام الموضوع كمراقب. أخيرًا ، اشترك في الموضوع. سيتم دفع جميع القيم المنبعثة من المرصد إلى الموضوع ، وسيقوم الموضوع ببث القيم المستلمة لجميع مراقبيه.

// Create an Observable that emits data every second const observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 5) { clearInterval(interval); } }, 1000); }); // Create a subject const subject = new Subject(); // Use the Subject as Observer and subscribe to the Observable observable.subscribe(subject); // Subscribe to the subject subject.subscribe({ next: value => console.log(value) }); /* Output 1 2 3 4 5 */

العاملين

العوامل التي تجعل RxJS مفيدة. العوامل هي وظائف خالصة ترجع إلى Observable جديد. يمكن تصنيفها إلى فئتين رئيسيتين:

  1. عوامل الإنشاء
  2. مشغلي الأنابيب

عوامل الإنشاء

عوامل الإنشاء هي وظائف يمكنها إنشاء مرصد جديد.

مثال: يمكننا إنشاء Observable يرسل كل عنصر من عناصر المصفوفة باستخدام fromعامل التشغيل.

const observable = from([2, 30, 5, 22, 60, 1]); observable.subscribe({ next: (value) => console.log("Received", value), error: (err) => console.log(err), complete: () => console.log("done") }); /* OUTPUTS Received 2 Received 30 Received 5 Received 22 Received 60 Received 1 done */

يمكن ملاحظة نفس الشيء باستخدام مخطط الرخام.

مشغلي الأنابيب

العوامل التي يمكن ملاحظتها هي وظائف تأخذ الملاحظة كمدخل وتعيد ملاحظة جديدة مع سلوك معدل.

مثال: لنأخذ الملاحظة التي أنشأناها باستخدام fromعامل التشغيل. الآن باستخدام هذا المرصد ، يمكننا إنشاء مرصد جديد لا يصدر سوى أرقام أكبر من 10 باستخدام filterعامل التشغيل.

const greaterThanTen = observable.pipe(filter(x => x > 10)); greaterThanTen.subscribe(console.log, console.log, () => console.log("completed")); // OUTPUT // 11 // 12 // 13 // 14 // 15

يمكن تمثيل نفس الشيء باستخدام مخطط الرخام.

هناك العديد من المشغلين المفيدين هناك. يمكنك الاطلاع على قائمة المشغلين الكاملة إلى جانب أمثلة على وثائق RxJS الرسمية هنا.

من الأهمية بمكان فهم جميع المشغلين الأكثر استخدامًا. فيما يلي بعض العوامل التي أستخدمها كثيرًا:

  1. mergeMap
  2. switchMap
  3. exhaustMap
  4. map
  5. catchError
  6. startWith
  7. delay
  8. debounce
  9. throttle
  10. interval
  11. from
  12. of

ملاحظات إحياء

حسب الموقع الرسمي ،

البرامج الوسيطة المستندة إلى RxJS لـ Redux. قم بتكوين وإلغاء الإجراءات غير المتزامنة لإنشاء تأثيرات جانبية والمزيد.

في Redux ، كلما تم إرسال إجراء ، فإنه يمر عبر جميع وظائف المخفض ويتم إرجاع حالة جديدة.

تأخذ إعادة الملحوظة كل هذه الإجراءات المرسلة والحالات الجديدة وتخلق منها ملاحظتين - الإجراءات الملحوظة action$والدول التي يمكن ملاحظتها state$.

ستطلق الإجراءات التي يمكن ملاحظتها جميع الإجراءات التي يتم إرسالها باستخدام ملف store.dispatch(). ستنبعث الدول التي يمكن ملاحظتها جميع كائنات الحالة الجديدة التي يتم إرجاعها بواسطة مخفض الجذر.

الملاحم

حسب الموقع الرسمي ،

إنها وظيفة تأخذ سلسلة من الإجراءات وتعيد سلسلة من الإجراءات. الإجراءات في الإجراءات.

Epics are functions that can be used to subscribe to Actions and States Observables. Once subscribed, epics will receive the stream of actions and states as input, and it must return a stream of actions as an output. Actions In - Actions Out.

const someEpic = (action$, state$) => { return action$.pipe( // subscribe to actions observable map(action => { // Receive every action, Actions In return someOtherAction(); // return an action, Actions Out }) ) }

It is important to understand that all the actions received in the Epic have already finished running through the reducers.

Inside an Epic, we can use any RxJS observable patterns, and this is what makes redux-observables useful.

Example: we can use the .filter operator to create a new intermediate observable. Similarly, we can create any number of intermediate observables, but the final output of the final observable must be an action, otherwise an exception will be raised by redux-observable.

const sampleEpic = (action$, state$) => { return action$.pipe( filter(action => action.payload.age >= 18), // can create intermediate observables and streams map(value => above18(value)) // where above18 is an action creator ); }

Every action emitted by the Epics are immediately dispatched using the store.dispatch().

Setup

First, let's install the dependencies.

npm install --save rxjs redux-observable

Create a separate folder named epics to keep all the epics. Create a new file index.js inside the epics folder and combine all the epics using the combineEpics function to create the root epic. Then export the root epic.

import { combineEpics } from 'redux-observable'; import { epic1 } from './epic1'; import { epic2 } from './epic2'; const epic1 = (action$, state$) => { ... } const epic2 = (action$, state$) => { ... } export default combineEpics(epic1, epic2);

Create an epic middleware using the createEpicMiddleware function and pass it to the createStore Redux function.

import { createEpicMiddleware } from 'redux-observable'; import { createStore, applyMiddleware } from 'redux'; import rootEpic from './rootEpics'; const epicMiddleware = createEpicMiddlware(); const store = createStore( rootReducer, applyMiddleware(epicMiddlware) );

Finally, pass the root epic to epic middleware's .run method.

epicMiddleware.run(rootEpic);

Some Practical Usecases

RxJS has a big learning curve, and the redux-observable setup worsens the already painful Redux setup process. All that makes Redux observable look like an overkill. But here are some practical use cases that can change your mind.

Throughout this section, I will be comparing redux-observables with redux-thunk to show how redux-observables can be helpful in complex use-cases. I don't hate redux-thunk, I love it, and I use it every day!

1. Make API Calls

Usecase: Make an API call to fetch comments of a post. Show loaders when the API call is in progress and also handle API errors.

A redux-thunk implementation will look like this,

function getComments(postId){ return (dispatch) => { dispatch(getCommentsInProgress()); axios.get(`/v1/api/posts/${postId}/comments`).then(response => { dispatch(getCommentsSuccess(response.data.comments)); }).catch(() => { dispatch(getCommentsFailed()); }); } }

and this is absolutely correct. But the action creator is bloated.

We can write an Epic to implement the same using redux-observables.

const getCommentsEpic = (action$, state$) => action$.pipe( ofType('GET_COMMENTS'), mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`).pipe( map(response => getCommentsSuccess(response.data.comments)), catchError(() => getCommentsFailed()), startWith(getCommentsInProgress()) ) );

Now it allows us to have a clean and simple action creator like this,

function getComments(postId) { return { type: 'GET_COMMENTS', payload: { postId } } }

2. Request Debouncing

Usecase: Provide autocompletion for a text field by calling an API whenever the value of the text field changes. API call should be made 1 second after the user has stopped typing.

A redux-thunk implementation will look like this,

let timeout; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { axios.get(`/suggestions?q=${value}`) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }

It requires a global variable timeout. When we start using global variables, our action creators are not longer pure functions. It also becomes difficult to unit test the action creators that use a global variable.

We can implement the same with redux-observable using the .debounce operator.

const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), debounce(1000), mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );

Now, our action creators can be cleaned up, and more importantly, they can be pure functions again.

function valueChanged(value) { return { type: 'VALUE_CHANGED', payload: { value } } }

3. Request Cancellation

Usecase: Continuing the previous use-case, assume that the user didn't type anything for 1 second, and we made our 1st API call to fetch the suggestions.

Let's say the API itself takes an average of 2-3 seconds to return the result. Now, if the user types something while the 1st API call is in progress, after 1 second, we will make our 2nd API. We can end up having two API calls at the same time, and it can create a race condition.

To avoid this, we need to cancel the 1st API call before making the 2nd API call.

A redux-thunk implementation will look like this,

let timeout; var cancelToken = axios.cancelToken; let apiCall; function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { // Cancel the existing API apiCall && apiCall.cancel('Operation cancelled'); // Generate a new token apiCall = cancelToken.source(); axios.get(`/suggestions?q=${value}`, { cancelToken: apiCall.token }) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); } }

Now, it requires another global variable to store the Axios's cancel token. More global variables = more impure functions!

To implement the same using redux-observable, all we need to do is replace .mergeMap with .switchMap.

const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), throttle(1000), switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()) );

Since it doesn't require any changes to our action creators, they can continue to be pure functions.

وبالمثل ، هناك العديد من حالات الاستخدام التي يتألق فيها Redux-Observables بالفعل! على سبيل المثال ، استقصاء واجهة برمجة تطبيقات ، وإظهار أشرطة الوجبات الخفيفة ، وإدارة اتصالات WebSocket ، إلخ.

ليستنتج

إذا كنت تقوم بتطوير تطبيق Redux يتضمن حالات استخدام معقدة ، يوصى بشدة باستخدام Redux-Observables. بعد كل شيء ، تتناسب فوائد استخدامه بشكل مباشر مع مدى تعقيد تطبيقك ، ويتضح من حالات الاستخدام العملية المذكورة أعلاه.

أعتقد بشدة أن استخدام المجموعة الصحيحة من المكتبات سيساعدنا على تطوير تطبيقات أكثر نظافة وقابلة للصيانة ، وعلى المدى الطويل ، ستفوق فوائد استخدامها العيوب.