האזנה לערוץ בתוך כמה פונקציות במקביל
-
@nigun הקוד שהבאתי הוא בסיס מופשט להבנת הענין
package main import ( "fmt" "time" ) var ( // הערוץ הראשי שאליו מוזרם המידע mainChan = make(chan string) // מערך של ערוצי משנה // כל פונקציה שתרצה לקבל מידע מהערוץ הראשי תצטרך ליצור ערוץ משנה ולהוסיף אותו למערך הזה channels = []chan string{} ) func main() { go func() { // פונקציה היחידה שמקבל את המידע מהערוץ הראשי ומזרימה אותו לערוצי המשנה msg := <-mainChan for _, c := range channels { c <- msg } }() go func(c chan string) { time.Sleep(2 * time.Second) c <- "foo" }(mainChan) go someThread() go someThread() go someThread() go someThread() time.Sleep(4 * time.Second) } func someThread() { c := addListener() s := <-c fmt.Println(s) } func addListener() chan string { // הפונקציה שתפקידה ליצור את ערוצי המשנה ולהוסיף אותם למערך כדי להאזין למידע מהערוץ הראשי newChan := make(chan string) channels = append(channels, newChan) return newChan }
זה עובד טוב בשביל פונקציות חד פעמיות, אם תרצה להפוך את זה למאזינים רב פעמיים, לא צריך לעשות הרבה, רק להוסיף לולאות אינסופיות שמאזינות למידע מהערוץ, כך שברגע שיתקבל מידע מהערוץ הפונקציה תעשה את המוטל עליה ומיד אחר כך תמשיך לאיטרציה הבאה בלולאה ששוב מאזינה לערוץ:
package main import ( "fmt" "time" ) var ( mainChan = make(chan string) channels = []chan string{} ) func main() { go func() { for { // << לולאה אינסופית שמאזינה לערוץ הראשי msg := <-mainChan for _, c := range channels { c <- msg } } }() go func(c chan string) { time.Sleep(1 * time.Second) c <- "foo" time.Sleep(1 * time.Second) c <- "foo" // << שליחת מידע לערוץ הראשי פעם נוספת }(mainChan) go someThread() go someThread() go someThread() go someThread() time.Sleep(4 * time.Second) } func someThread() { c := addListener() for { // << לולאה אינסופית שמאזינה לערוץ המשנה s := <-c fmt.Println(s) } } func addListener() chan string { newChan := make(chan string) channels = append(channels, newChan) return newChan }
עד כאן הכל פשוט, הבעיה מתחילה בסיטואציה של השואל בסטאק, שיתכנו מאזינים שנרשמו לערוצי המשנה, ובהמשך הם יתנתקו וירדו מההאזנה, הבעיה היא שכל מאזין מריץ לולאה אינסופית, ואם לא תעצור את הלולאה כשהוא יתנתק זה יגרום לשימוש מיותר בזיכרון. בשביל זה הוא צריך להוסיף לכל מאזין גם תנאי עצירה, ולכן הוא הוסיף את הערוץ quit, וזה סיבך את הקוד.
האם אתה גם צריך את זה? זה תלוי באופי השימוש של המאזינים שאתה מקים, אם הם אמורים לרוץ כל חיי האפליקציה, אז זה מיותר, אם יש סיכוי שחלק מהם או כולם יגמרו את התפקיד שלהם וימותו, אז תצטרך לדאוג לעצירת הלולאה.
גם למקרה שצריך לעצור את הלולאה, אם המקרה של סיום התפקיד של הפונקציה ידוע וצפוי מראש, לא תצטרך להשתמש בשיטה של הערוץ quit, אלא פשוט תעצור את הלולאה בעצמך, אם זה לא תלוי בידך אלא זה מקרה לא צפוי (כמו במקרה של השואל שם שמשתמש יתנתק) לזה נועד השימוש בערוץ ובselect
בעיה נוספת שהעלו שם, אם המאזינים נרשמים ומתווספים למערך בו בזמן שהערוץ בפעולה, והפתרון שהוא הציע זה לנעול את הערוץ עם
sync.Mutex
בזמן שמוסיפים מאזין ולשחרר אותו אחר כך, אבל לא הצלחתי להבין מה בדיוק יגרום לבעיה ולמה צריך את הפיתרון. -
@יוסף-בן-שמעון
עזרת לי מאוד
אבל נראה לי שיש כאן בעיה
כי אם אני מוסיף עוד מאזינים אחרי הקריאה הראשונה
הם לא מצליחים להאזין
משהו כזה:go someThread() go someThread() time.Sleep(1 * time.Second) //קריאה ראשונה לערוץ הראשי func(c chan string) { c <- "foo" }(mainChan) time.Sleep(1 * time.Second) // יצירת מאזנים חדשים go someThread() go someThread() // קריאה שניה לערוץ הראשי func(c chan string) { c <- "foo2" }(mainChan) time.Sleep(4 * time.Second) } func someThread() { c := addListener() //for { // שים לב שכאן אני מאזין רק פעם אחת לערוץ s := <-c fmt.Println(s) // } }
נראה לי שאחרי שנעשה שימוש במערך של הערוצים משום מה אי אפשר להוסיף לו עוד אברים.
-
@nigun הקוד הזה עובד מצוין
https://play.golang.org/p/YOYhQerLKFd
כמובן שאחרי המידע הראשון שנשלח לערוץ המאזין נסגר ואי אפשר לשלוח עוד מידע, אבל המידע הראשון מתקבל גם לפונקציות המאוחרות.
הקוד שהעלת קטוע, אולי תעלה דוגמת קוד שלימה עם בעיה -
@nigun טעיתי, בקוד הזה באמת שליחת המידע מתבצעת רק אחרי שכל המאזינים נרשמים, אם נעשה את זה כך:
https://play.golang.org/p/hjKVDAq2hLT
לא נראה את המידע השני, אבל זה לא בגלל שאי אפשר להוסיף איברים למערך, אלא בגלל שבזמן שאמור להישלח המידע הקומפיילר עדיין באמצע השינה, וכשהוא מתעורר מהשינה כבר מאוחר כי הפונקציה main כבר מתה.
ניסיתי לעשות את זה עם שרת HTTP וזה נראה עובד בסדר גמורpackage main import ( "fmt" "net/http" ) var ( mainChan = make(chan string) channels = []chan string{} ) func main() { go func() { for { msg := <-mainChan for _, c := range channels { c <- msg } } }() http.HandleFunc("/a", sendDataToChannel) http.HandleFunc("/b", someThread) http.ListenAndServe(":8090", nil) } func sendDataToChannel(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "") mainChan <- "foo" } func someThread(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "") go func() { c := addListener() // for { s := <-c fmt.Println(s) // } }() } func addListener() chan string { newChan := make(chan string) channels = append(channels, newChan) return newChan }
-
@יוסף-בן-שמעון
זה לא עובד לי
אני נכנס קודם לכתובת
http://127.0.0.1:8090/b
פעמיים
ואז פעם אחת ל
http://127.0.0.1:8090/a
ואני מקבל את הפלט (בטרמינל)
foo
foo
עד כאן הכל טוב ויפה
אבל אם אני מנסה להיכנס שוב ל
http://127.0.0.1:8090/b
ואז שוב ל
http://127.0.0.1:8090/a
לא קורה כלום -
@יוסף-בן-שמעון
מצאתי את הפתרון
פשוט הערוצים הישניםמתוכבר לא מאזינים,אבל עדיין נמצאים במערך
וכיוון שהם הראשונים במערך, הלולאה מחכה לשדר למאזין הראשון במערך ולא ממשיכה הלאה.
הפתרון הוא להסיר את הערוץ מהמערך אחרי שהוא מפסיק להאזיןfunc someThread(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "") go func() { c := addListener() s := <-c fmt.Println(s) //קודם כל אני סוגר את הערוץ (לא בטוח שזה חכם) close(c) // אני מריץ ללואה שמסירה את הערוץ המת מהמערך for i := 0; i < len(channels); i++ { chanvar := channels[i] if chanvar == c { channels = append(channels[:i], channels[i+1:]...) i-- // Important: decrease index break } } }() }
השאלה היא מה יקרה אם יגיע קריאה חדשה לערוץ הראשי תוך כדי שאני סוגר את הערוץ לפני שאני מסיר אותו מהמערך
לכאורה אני יקבל שגיאהpanic: send on closed channel
כי אני מנסה לשלוח מידע לערוץ סגור
הפרון הוא אולי לנעול את המאזין הראשי כשאני מסיר איבר מהמערך
השאלה היא איך אני מדבג את זה?
לבניים אני מוותר על סגירת הערוץ, ורק מוחק אותו מהמערך
השאלה היא מה ההשלכות? -
@nigun אמר בהאזנה לערוץ בתוך כמה פונקציות במקביל:
לבניים אני מוותר על סגירת הערוץ, ורק מוחק אותו מהמערך
השאלה היא מה ההשלכות?לפי מה שכתוב פה
https://stackoverflow.com/questions/8593645/is-it-ok-to-leave-a-channel-open
אין צורך לסגור את הערוץ, מספיק לנקות את המצביע אליו ואיסוף הזבל ידאג לשאר -
@יוסף-בן-שמעון
איך אני יאזין למשתנה בין הערוצים? עם האזנה אינסופית עם if ובמקרה שהמשתנה שונה מהקריאה הקודמת לעשות פעולה כל שהיא? -
@nigun אני מדבר על משהו כזה
package main import ( "fmt" "time" ) var data string func main() { c := make(chan string) go func(c chan string) { time.Sleep(2 * time.Second) data = "foo" close(c) }(c) go func() { <-c fmt.Println(data) }() go func() { <-c fmt.Println(data) }() time.Sleep(4 * time.Second) }
-
@יוסף-בן-שמעון
אני צריך שליחה רב פעמית
זאת אומרת שאני יוסיף ויוריד מאזינים (שיוכלו להאזין פעם אחת או יותר)
ולשלוח כל הזמן (עם שרת HTTP ) מידע שיעבור לכל המאזינים
את האמת אני צריך לשלוח את המידע לרוב רק למאזין אחד, (כי כל מאזין שייך למשתמש אחר)
ואני שולח את הבקשה בצורה כזאתUSER:DATA
ולמאזין אני בונה לולאה שמאזינה כל הזמן לערוץ
ואם היוזר שווה ליוזר שלו הוא עושה את הפעולה המבוקשת ואם לא הוא חוזר להאזין,
האם אני עובד נכון או לא? -
@nigun אמר בהאזנה לערוץ בתוך כמה פונקציות במקביל:
האם אני עובד נכון או לא?
אני מאמין שככל שתסביר יותר את הסיטואציה יגדלו הסיכויים שתהיה לך תשובה מדוייקת יותר. (סיטואציה, לא צרכים נגזרים מהסיטואציה)
-
@יוסף-בן-שמעון
אני לרוב מסתבך כשכותבים מידי הרבה פרטים, אבל אם זה יעזור אז מצויין.
אני בונה מרכזיה באסטריסק שבו אני רוצה לשלוט על חלק מהמאזינים
דהיינו לדחוף להם באמצע האזנה פקודות שונות דרך API משרת HTTP
(אמנם יש לאסטריסק API לשליחת פקודות כאלו ,אבל אני לא הצלחתי להשתמש בו למה שאני צריך)
לכן חשבתי ללכת על הכיוון של ערוצי GO שפשוט כל שיחה (שאני רוצה לשלוט עליה) תאזין לערוץ
ובAPI החיצוני אני ישלח את שם המאזין שבו אני רוצה לשלוט , ואת הפקודה שאני רוצה שהוא יבצע
כשמתקבל בקשה כל המאזינים יבצעו בדיקה האם הבקשה מופנית אליהם, ואם כן יבצעו אותה
לקינוח אני יכול להחזיר באותה צורה הודעה דרך ערוץ אחר לשרת HTTP שהפעולה התבצעה +מידע על התוצאה.בעיקרון נראה לי שהכל אמור לעבוד מצויין
ולא אמור להיות לי הרבה מאזינים בו זמנית כך שלא אמור להיווצר עומס חריג מהבדיקה שכולם יבצעו ביחד.
אני רק שואל האם אני לא מגזים בלולאניות הזאת ויש דרך יותר פשוט לעשות את זה? או שפיפסתי כאן משהו והקוד הזה יתקע לי במקרים מסויימים. -
@nigun סורי, לא הצלחתי להבין איך נכנסו הערוצים לתמונה, למה לא לעשות כמו שעשו כל הדורות מימות אסמבלי ועד ימינו, כשמגיע השלב שאתה רוצה לדחוף פקודה למאזין, קרא לפונקציה חיצונית, תעביר לה פרמטרים נצרכים (user, data) ותקבל את הערך המוחזר. (אולי בגלל שאני לא מכיר אסטריסק)
-
@יוסף-בן-שמעון
פשוט אין שלב שבו אני רוצה לקרוא למאזין
אני תמיד רוצה להשאיר את האופציה לקריאה פתוחה
למשל אם המאזין הרגע התחיל להקשיב לקובץ באורך של שעה, ואני רוצה להעיף אותו משם ,שיקשיב למשהו אחר
אני פשוט בונה את כל הפקודות בצורה אסינכרונית , דהיינו שמפעיל את הפקודה "השמע קובץ פלוני" אבל עם go לפני הפקודה
ומיד אחר כך ממשיך להמתין לפקודות שלי
ככה זה נראה בערך בקודfor { msg := <-c msgsplit := strings.Split(mesege, ":") if msgsplit[0] == user { filename := msgsplit[1] go myAgi.StreamFile(filename, "") }
בקוד הנ"ל המאזין מקבל את הערך מהערוץ בודק האם זה תואם את השם שלו
אם כן , הוא בודק מה הערך השני שנשלח (אחרי הנקודותיים)
ומשמיע את הקובתץ למאזין ומיד אחרי הפקודה להשמיע את הקובץ הוא חוזר להאזין. -
@nigun מה דעתך על שימוש במפה, שהמפתחות שלה הם המזהים של המשתמשים, נראה לי יותר קריא ונקי
כל משתמש שמתחבר אתה מקים לו ערוץ, וכשהוא יתנתק אתה דואג לנתק אותו כדי לשחרר את הערוץ ולעצור את ההאזנה האינסופיתpackage main import ( "fmt" "time" ) var usersChannels = make(map[string]chan string) func main() { go helloUser("A") go helloUser("B") time.Sleep(500) sendDataToUser("A", "Hi") sendDataToUser("B", "By") time.Sleep(500) disconnectUser("A") sendDataToUser("A", "Hi") time.Sleep(5000) } func helloUser(name string) { usersChannels[name] = make(chan string) // כאן אתה מוסיף את הערוץ של המשתמש למפה for { msg, isConnected := <-usersChannels[name] // מאזין למידע מהערוץ if !isConnected { // אם המידע הוא שהערוץ נסגר, זה אומר שהמשתמש התנתק ואפשר לעצור את הפונקציה return } fmt.Println(msg) } } func disconnectUser(name string) { // סוגר את הערוץ כדי לעצור את ההאזנה ומוחק את המשתמש מהמפה close(usersChannels[name]) delete(usersChannels, name) } func sendDataToUser(name, data string) { channel, hasUser := usersChannels[name] if hasUser { // כדי למנוע מצב שהמשתמש כבר התנתק והערוץ שלו לא קיים במפה channel <- data } }
-
@nigun אליבא דאמת שימוש בערוצים במקרה הזה סתם מכביד על הקוד, קח מוטציה קלילה יותר של זה, במקום ערוצים שמתי קולבקים
package main import ( "fmt" "time" ) var usersCallback = make(map[string]func(data string)) func main() { go helloUser("A") go helloUser("B") time.Sleep(500) sendDataToUser("A", "Hi") sendDataToUser("B", "By") time.Sleep(500) disconnectUser("A") sendDataToUser("A", "Hi") time.Sleep(5000) } func helloUser(name string) { usersCallback[name] = func(msg string) { fmt.Println(msg) } } func disconnectUser(name string) { delete(usersCallback, name) } func sendDataToUser(name, data string) { callback, hasUser := usersCallback[name] if hasUser { callback(data) } }
-
@יוסף-בן-שמעון
את האמת אני לא מכיר בכלל את הקונספט של קולבקים
כנראה אני אשב על זה מתי שהוא ללמוד את זה (לא נראה מסובך)